0
# Asynchronous File Reading
1
2
Line-by-line processing of standard input or large files without memory concerns, with processing statistics and automatic flow control.
3
4
## Capabilities
5
6
### Main Reading Function
7
8
Processes input streams line-by-line asynchronously, with automatic backpressure handling and statistics collection.
9
10
```typescript { .api }
11
/**
12
* Process input stream line by line asynchronously
13
* @param lineHandler - Function to process each line
14
* @param input - Input stream (defaults to process.stdin)
15
* @returns Promise that resolves with processing statistics when all lines are processed
16
*/
17
function read(
18
lineHandler: LineHandler,
19
input?: NodeJS.ReadableStream
20
): Promise<Stats>;
21
22
/**
23
* Line processing function
24
* @param line - The line content as string
25
* @param index - Zero-based line number
26
* @returns Promise that resolves when line processing is complete
27
*/
28
type LineHandler = (line: string, index: number) => Promise<any>;
29
```
30
31
**Usage Examples:**
32
33
```typescript
34
import { read } from "stdio";
35
36
// Basic line processing
37
const stats = await read(async (line, index) => {
38
console.log(`Line ${index}: ${line}`);
39
});
40
console.log(`Processed ${stats.length} lines in ${stats.timeAverage}ms average`);
41
42
// Data transformation with processing time tracking
43
await read(async (line, index) => {
44
const data = JSON.parse(line);
45
const processed = await processData(data);
46
await saveToDatabase(processed);
47
48
if (index % 1000 === 0) {
49
console.log(`Processed ${index} lines`);
50
}
51
});
52
53
// Reading from file instead of stdin
54
import { createReadStream } from 'fs';
55
56
const fileStream = createReadStream('large-file.txt');
57
await read(async (line, index) => {
58
// Process each line from file
59
await processLine(line);
60
}, fileStream);
61
62
// Error handling in line processing
63
await read(async (line, index) => {
64
try {
65
const result = await riskyOperation(line);
66
console.log(`Line ${index}: ${result}`);
67
} catch (error) {
68
console.error(`Failed to process line ${index}: ${error.message}`);
69
// Throwing here will stop processing and reject the main promise
70
throw error;
71
}
72
});
73
```
74
75
### Statistics Interface
76
77
Provides processing statistics including timing information and line counts.
78
79
```typescript { .api }
80
interface Stats {
81
/** Total number of lines processed */
82
length: number;
83
/** Processing time for each line in milliseconds */
84
times: number[];
85
/** Average processing time per line in milliseconds */
86
timeAverage: number;
87
}
88
```
89
90
**Statistics are automatically calculated:**
91
- `length`: Incremented for each line received
92
- `times`: Array of processing times for performance analysis
93
- `timeAverage`: Computed average of processing times
94
95
### Internal State Management
96
97
The read function manages complex internal state for optimal performance and memory usage.
98
99
```typescript { .api }
100
interface State {
101
/** Buffer of lines waiting to be processed */
102
buffer: string[];
103
/** Whether the input stream is still open */
104
isOpen: boolean;
105
/** Processing statistics */
106
stats: Stats;
107
/** Readline interface for stream handling */
108
reader: ReadLine;
109
/** Promise resolution function */
110
resolve: Function;
111
/** Promise rejection function */
112
reject: Function;
113
/** User-provided line processing function */
114
lineHandler: LineHandler;
115
/** Current line index (zero-based) */
116
index: number;
117
}
118
```
119
120
## Advanced Features
121
122
### Automatic Flow Control
123
124
The read function automatically manages stream flow to prevent memory issues:
125
126
- **Backpressure Handling**: Pauses input stream during line processing
127
- **Buffer Management**: Maintains small line buffer to prevent memory overflow
128
- **Resume Logic**: Resumes stream reading when processing completes
129
130
### Memory Efficiency
131
132
Designed for processing arbitrarily large files:
133
134
- **Streaming Processing**: Only one line in memory at a time
135
- **No Accumulation**: Does not accumulate results (user handles storage)
136
- **Automatic Cleanup**: Properly closes streams and clears buffers
137
138
### Error Propagation
139
140
Comprehensive error handling throughout the processing pipeline:
141
142
```typescript
143
// Errors in line handler stop processing
144
await read(async (line, index) => {
145
if (line.startsWith('ERROR')) {
146
throw new Error(`Invalid line at ${index}`);
147
}
148
// This will cause the main read promise to reject
149
});
150
151
// Stream errors are properly propagated
152
const brokenStream = new BrokenReadableStream();
153
try {
154
await read(lineHandler, brokenStream);
155
} catch (error) {
156
console.error('Stream error:', error);
157
}
158
```
159
160
### Performance Monitoring
161
162
Built-in timing statistics for performance analysis:
163
164
```typescript
165
await read(async (line, index) => {
166
// Long-running processing
167
await complexOperation(line);
168
169
// Stats are automatically collected:
170
// - Processing time for this line
171
// - Running average of all processing times
172
// - Total line count
173
});
174
175
// Access final statistics through promise resolution
176
const stats = await read(lineHandler);
177
console.log(`Processed ${stats.length} lines`);
178
console.log(`Average time: ${stats.timeAverage}ms per line`);
179
```
180
181
## Integration with Node.js Streams
182
183
Works seamlessly with any Node.js readable stream:
184
185
```typescript
186
import { createReadStream } from 'fs';
187
import { createGunzip } from 'zlib';
188
import { read } from 'stdio';
189
190
// Reading compressed files
191
const gzipStream = createReadStream('data.txt.gz').pipe(createGunzip());
192
await read(async (line) => {
193
// Process decompressed lines
194
}, gzipStream);
195
196
// Reading from HTTP responses
197
import { get } from 'https';
198
199
get('https://api.example.com/data', (response) => {
200
read(async (line) => {
201
const record = JSON.parse(line);
202
await processRecord(record);
203
}, response);
204
});
205
```