or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-tools.mdcore-parsing.mdgrammar-management.mdindex.mdstream-processing.mdtext-generation.md

stream-processing.mddocs/

0

# Stream Processing

1

2

Node.js stream integration for parsing large inputs or continuous data streams.

3

4

## Capabilities

5

6

### StreamWrapper Class

7

8

Writable stream wrapper that feeds data to a nearley parser as it arrives.

9

10

```javascript { .api }

11

/**

12

* Create a writable stream wrapper for a nearley parser

13

* @param parser - Nearley Parser instance to feed data to

14

*/

15

class StreamWrapper extends require('stream').Writable {

16

constructor(parser: Parser);

17

}

18

19

/**

20

* StreamWrapper instance properties

21

*/

22

interface StreamWrapper {

23

/** The wrapped parser instance */

24

_parser: Parser;

25

}

26

```

27

28

**Usage Examples:**

29

30

```javascript

31

const fs = require("fs");

32

const nearley = require("nearley");

33

const StreamWrapper = require("nearley/lib/stream");

34

35

// Create parser

36

const grammar = nearley.Grammar.fromCompiled(require("./grammar.js"));

37

const parser = new nearley.Parser(grammar);

38

39

// Create stream wrapper

40

const stream = new StreamWrapper(parser);

41

42

// Handle stream events

43

stream.on('finish', () => {

44

console.log("Parsing complete");

45

console.log("Results:", parser.results);

46

});

47

48

stream.on('error', (error) => {

49

console.error("Stream error:", error.message);

50

});

51

52

// Pipe file through parser

53

fs.createReadStream('input.txt', 'utf8').pipe(stream);

54

```

55

56

### Stream Write Implementation

57

58

The internal write method that processes chunks.

59

60

```javascript { .api }

61

/**

62

* Internal stream write implementation

63

* @param chunk - Data chunk to process

64

* @param encoding - Character encoding

65

* @param callback - Completion callback

66

*/

67

_write(chunk: Buffer, encoding: string, callback: Function): void;

68

```

69

70

**Usage Examples:**

71

72

```javascript

73

const StreamWrapper = require("nearley/lib/stream");

74

75

// Manual stream writing

76

const stream = new StreamWrapper(parser);

77

78

// Write data chunks manually

79

stream.write("first chunk");

80

stream.write(" second chunk");

81

stream.end(); // Signal end of input

82

83

// Or use with any readable stream

84

process.stdin.pipe(stream);

85

```

86

87

### Processing Large Files

88

89

Handle large files that don't fit in memory.

90

91

**Usage Examples:**

92

93

```javascript

94

const fs = require("fs");

95

const readline = require("readline");

96

const nearley = require("nearley");

97

const StreamWrapper = require("nearley/lib/stream");

98

99

// Example: Parse line-by-line for very large files

100

async function parseHugeFile(filename) {

101

const grammar = nearley.Grammar.fromCompiled(require("./line-grammar.js"));

102

const parser = new nearley.Parser(grammar);

103

const stream = new StreamWrapper(parser);

104

105

const fileStream = fs.createReadStream(filename);

106

const rl = readline.createInterface({

107

input: fileStream,

108

crlfDelay: Infinity

109

});

110

111

// Process line by line

112

for await (const line of rl) {

113

stream.write(line + '\n');

114

}

115

116

stream.end();

117

return parser.results;

118

}

119

120

// Usage

121

parseHugeFile('huge-data.txt')

122

.then(results => console.log("Parsed results:", results))

123

.catch(error => console.error("Parse error:", error));

124

```

125

126

### Real-time Stream Processing

127

128

Handle continuous data streams like network connections.

129

130

**Usage Examples:**

131

132

```javascript

133

const net = require("net");

134

const nearley = require("nearley");

135

const StreamWrapper = require("nearley/lib/stream");

136

137

// Example: TCP server that parses incoming data

138

const server = net.createServer((socket) => {

139

console.log("Client connected");

140

141

// Create parser for this connection

142

const grammar = nearley.Grammar.fromCompiled(require("./protocol-grammar.js"));

143

const parser = new nearley.Parser(grammar);

144

const stream = new StreamWrapper(parser);

145

146

// Handle complete messages

147

stream.on('finish', () => {

148

if (parser.results.length > 0) {

149

const message = parser.results[0];

150

console.log("Received message:", message);

151

152

// Send response

153

socket.write("Message received\n");

154

}

155

});

156

157

// Pipe socket data through parser

158

socket.pipe(stream);

159

160

socket.on('end', () => {

161

console.log("Client disconnected");

162

});

163

});

164

165

server.listen(3000, () => {

166

console.log("Parser server listening on port 3000");

167

});

168

```

169

170

### Error Handling in Streams

171

172

Handle parsing errors in streaming contexts.

173

174

**Usage Examples:**

175

176

```javascript

177

const fs = require("fs");

178

const nearley = require("nearley");

179

const StreamWrapper = require("nearley/lib/stream");

180

181

function parseFileStream(filename) {

182

return new Promise((resolve, reject) => {

183

const grammar = nearley.Grammar.fromCompiled(require("./grammar.js"));

184

const parser = new nearley.Parser(grammar);

185

const stream = new StreamWrapper(parser);

186

187

// Handle successful completion

188

stream.on('finish', () => {

189

if (parser.results.length === 0) {

190

reject(new Error("No valid parse found"));

191

} else {

192

resolve(parser.results);

193

}

194

});

195

196

// Handle stream errors

197

stream.on('error', (error) => {

198

reject(new Error(`Stream error: ${error.message}`));

199

});

200

201

// Create file stream with error handling

202

const fileStream = fs.createReadStream(filename, 'utf8');

203

204

fileStream.on('error', (error) => {

205

reject(new Error(`File error: ${error.message}`));

206

});

207

208

// Pipe with error propagation

209

fileStream.pipe(stream);

210

});

211

}

212

213

// Usage with error handling

214

parseFileStream('data.txt')

215

.then(results => {

216

console.log("Parse successful:", results);

217

})

218

.catch(error => {

219

console.error("Parse failed:", error.message);

220

});

221

```

222

223

### Stream Options and Configuration

224

225

Configure stream behavior for different use cases.

226

227

**Usage Examples:**

228

229

```javascript

230

const nearley = require("nearley");

231

const StreamWrapper = require("nearley/lib/stream");

232

233

// Parser with history for stream error recovery

234

const parser = new nearley.Parser(grammar, {

235

keepHistory: true,

236

lexer: customLexer

237

});

238

239

const stream = new StreamWrapper(parser);

240

241

// Configure stream options

242

stream.setDefaultEncoding('utf8');

243

244

// Handle backpressure for high-volume streams

245

let backpressure = false;

246

247

stream.on('drain', () => {

248

backpressure = false;

249

console.log("Stream ready for more data");

250

});

251

252

function writeToStream(data) {

253

if (!backpressure) {

254

backpressure = !stream.write(data);

255

if (backpressure) {

256

console.log("Stream backpressure detected");

257

}

258

}

259

}

260

```

261

262

## Integration with Default Lexer

263

264

```javascript { .api }

265

/**

266

* Default streaming lexer for character-by-character parsing

267

*/

268

class StreamLexer {

269

constructor();

270

reset(data: string, state?: object): void;

271

next(): {value: string} | undefined;

272

save(): {line: number, col: number};

273

formatError(token: object, message: string): string;

274

}

275

```

276

277

**Usage Examples:**

278

279

```javascript

280

const nearley = require("nearley");

281

const StreamWrapper = require("nearley/lib/stream");

282

283

// Using default StreamLexer with streams

284

const grammar = nearley.Grammar.fromCompiled(require("./char-grammar.js"));

285

const parser = new nearley.Parser(grammar); // Uses StreamLexer by default

286

287

const stream = new StreamWrapper(parser);

288

289

// The StreamLexer will process input character by character

290

// and provide line/column information for errors

291

stream.write("input text with\nmultiple lines");

292

stream.end();

293

294

// Error messages will include line/column info:

295

// "Syntax error at line 2 col 5: ..."

296

```