0
# Stream Processing
1
2
Node.js stream integration for parsing large inputs or continuous data streams.
3
4
## Capabilities
5
6
### StreamWrapper Class
7
8
Writable stream wrapper that feeds data to a nearley parser as it arrives.
9
10
```javascript { .api }
11
/**
12
* Create a writable stream wrapper for a nearley parser
13
* @param parser - Nearley Parser instance to feed data to
14
*/
15
class StreamWrapper extends require('stream').Writable {
16
constructor(parser: Parser);
17
}
18
19
/**
20
* StreamWrapper instance properties
21
*/
22
interface StreamWrapper {
23
/** The wrapped parser instance */
24
_parser: Parser;
25
}
26
```
27
28
**Usage Examples:**
29
30
```javascript
31
const fs = require("fs");
32
const nearley = require("nearley");
33
const StreamWrapper = require("nearley/lib/stream");
34
35
// Create parser
36
const grammar = nearley.Grammar.fromCompiled(require("./grammar.js"));
37
const parser = new nearley.Parser(grammar);
38
39
// Create stream wrapper
40
const stream = new StreamWrapper(parser);
41
42
// Handle stream events
43
stream.on('finish', () => {
44
console.log("Parsing complete");
45
console.log("Results:", parser.results);
46
});
47
48
stream.on('error', (error) => {
49
console.error("Stream error:", error.message);
50
});
51
52
// Pipe file through parser
53
fs.createReadStream('input.txt', 'utf8').pipe(stream);
54
```
55
56
### Stream Write Implementation
57
58
The internal write method that processes chunks.
59
60
```javascript { .api }
61
/**
62
* Internal stream write implementation
63
* @param chunk - Data chunk to process
64
* @param encoding - Character encoding
65
* @param callback - Completion callback
66
*/
67
_write(chunk: Buffer, encoding: string, callback: Function): void;
68
```
69
70
**Usage Examples:**
71
72
```javascript
73
const StreamWrapper = require("nearley/lib/stream");
74
75
// Manual stream writing
76
const stream = new StreamWrapper(parser);
77
78
// Write data chunks manually
79
stream.write("first chunk");
80
stream.write(" second chunk");
81
stream.end(); // Signal end of input
82
83
// Or use with any readable stream
84
process.stdin.pipe(stream);
85
```
86
87
### Processing Large Files
88
89
Handle large files that don't fit in memory.
90
91
**Usage Examples:**
92
93
```javascript
94
const fs = require("fs");
95
const readline = require("readline");
96
const nearley = require("nearley");
97
const StreamWrapper = require("nearley/lib/stream");
98
99
// Example: Parse line-by-line for very large files
100
async function parseHugeFile(filename) {
101
const grammar = nearley.Grammar.fromCompiled(require("./line-grammar.js"));
102
const parser = new nearley.Parser(grammar);
103
const stream = new StreamWrapper(parser);
104
105
const fileStream = fs.createReadStream(filename);
106
const rl = readline.createInterface({
107
input: fileStream,
108
crlfDelay: Infinity
109
});
110
111
// Process line by line
112
for await (const line of rl) {
113
stream.write(line + '\n');
114
}
115
116
stream.end();
117
return parser.results;
118
}
119
120
// Usage
121
parseHugeFile('huge-data.txt')
122
.then(results => console.log("Parsed results:", results))
123
.catch(error => console.error("Parse error:", error));
124
```
125
126
### Real-time Stream Processing
127
128
Handle continuous data streams like network connections.
129
130
**Usage Examples:**
131
132
```javascript
133
const net = require("net");
134
const nearley = require("nearley");
135
const StreamWrapper = require("nearley/lib/stream");
136
137
// Example: TCP server that parses incoming data
138
const server = net.createServer((socket) => {
139
console.log("Client connected");
140
141
// Create parser for this connection
142
const grammar = nearley.Grammar.fromCompiled(require("./protocol-grammar.js"));
143
const parser = new nearley.Parser(grammar);
144
const stream = new StreamWrapper(parser);
145
146
// Handle complete messages
147
stream.on('finish', () => {
148
if (parser.results.length > 0) {
149
const message = parser.results[0];
150
console.log("Received message:", message);
151
152
// Send response
153
socket.write("Message received\n");
154
}
155
});
156
157
// Pipe socket data through parser
158
socket.pipe(stream);
159
160
socket.on('end', () => {
161
console.log("Client disconnected");
162
});
163
});
164
165
server.listen(3000, () => {
166
console.log("Parser server listening on port 3000");
167
});
168
```
169
170
### Error Handling in Streams
171
172
Handle parsing errors in streaming contexts.
173
174
**Usage Examples:**
175
176
```javascript
177
const fs = require("fs");
178
const nearley = require("nearley");
179
const StreamWrapper = require("nearley/lib/stream");
180
181
function parseFileStream(filename) {
182
return new Promise((resolve, reject) => {
183
const grammar = nearley.Grammar.fromCompiled(require("./grammar.js"));
184
const parser = new nearley.Parser(grammar);
185
const stream = new StreamWrapper(parser);
186
187
// Handle successful completion
188
stream.on('finish', () => {
189
if (parser.results.length === 0) {
190
reject(new Error("No valid parse found"));
191
} else {
192
resolve(parser.results);
193
}
194
});
195
196
// Handle stream errors
197
stream.on('error', (error) => {
198
reject(new Error(`Stream error: ${error.message}`));
199
});
200
201
// Create file stream with error handling
202
const fileStream = fs.createReadStream(filename, 'utf8');
203
204
fileStream.on('error', (error) => {
205
reject(new Error(`File error: ${error.message}`));
206
});
207
208
// Pipe with error propagation
209
fileStream.pipe(stream);
210
});
211
}
212
213
// Usage with error handling
214
parseFileStream('data.txt')
215
.then(results => {
216
console.log("Parse successful:", results);
217
})
218
.catch(error => {
219
console.error("Parse failed:", error.message);
220
});
221
```
222
223
### Stream Options and Configuration
224
225
Configure stream behavior for different use cases.
226
227
**Usage Examples:**
228
229
```javascript
230
const nearley = require("nearley");
231
const StreamWrapper = require("nearley/lib/stream");
232
233
// Parser with history for stream error recovery
234
const parser = new nearley.Parser(grammar, {
235
keepHistory: true,
236
lexer: customLexer
237
});
238
239
const stream = new StreamWrapper(parser);
240
241
// Configure stream options
242
stream.setDefaultEncoding('utf8');
243
244
// Handle backpressure for high-volume streams
245
let backpressure = false;
246
247
stream.on('drain', () => {
248
backpressure = false;
249
console.log("Stream ready for more data");
250
});
251
252
function writeToStream(data) {
253
if (!backpressure) {
254
backpressure = !stream.write(data);
255
if (backpressure) {
256
console.log("Stream backpressure detected");
257
}
258
}
259
}
260
```
261
262
## Integration with Default Lexer
263
264
```javascript { .api }
265
/**
266
* Default streaming lexer for character-by-character parsing
267
*/
268
class StreamLexer {
269
constructor();
270
reset(data: string, state?: object): void;
271
next(): {value: string} | undefined;
272
save(): {line: number, col: number};
273
formatError(token: object, message: string): string;
274
}
275
```
276
277
**Usage Examples:**
278
279
```javascript
280
const nearley = require("nearley");
281
const StreamWrapper = require("nearley/lib/stream");
282
283
// Using default StreamLexer with streams
284
const grammar = nearley.Grammar.fromCompiled(require("./char-grammar.js"));
285
const parser = new nearley.Parser(grammar); // Uses StreamLexer by default
286
287
const stream = new StreamWrapper(parser);
288
289
// The StreamLexer will process input character by character
290
// and provide line/column information for errors
291
stream.write("input text with\nmultiple lines");
292
stream.end();
293
294
// Error messages will include line/column info:
295
// "Syntax error at line 2 col 5: ..."
296
```