or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

factory.mdindex.mdinstance-api.mdjsonpath-patterns.mdstream-processing.md

stream-processing.mddocs/

0

# Stream Processing

1

2

Oboe.js provides Node.js-specific capabilities for processing readable streams, including file streams, HTTP response streams, and any other Node.js readable stream source. This enables server-side JSON processing from various sources without loading entire files into memory.

3

4

## Capabilities

5

6

### Stream Constructor

7

8

Create an oboe instance that processes a Node.js readable stream.

9

10

```javascript { .api }

11

/**

12

* Create oboe instance for processing a readable stream

13

* @param {ReadableStream} stream - Node.js readable stream

14

* @returns {OboeInstance} Configured oboe instance

15

*/

16

function oboe(stream: ReadableStream): OboeInstance;

17

```

18

19

**Stream Type Support:**

20

- File streams (`fs.createReadStream()`)

21

- HTTP/HTTPS response streams

22

- Transform streams

23

- Any Node.js readable stream with standard methods

24

25

**Usage Examples:**

26

27

```javascript

28

const oboe = require('oboe');

29

const fs = require('fs');

30

const http = require('http');

31

32

// File stream processing

33

const fileStream = fs.createReadStream('large-data.json');

34

oboe(fileStream)

35

.node('!.records.*', function(record) {

36

console.log('Processing record:', record.id);

37

})

38

.done(function(json) {

39

console.log('File processing complete');

40

});

41

42

// HTTP response stream

43

http.get('http://api.example.com/stream', function(res) {

44

oboe(res)

45

.node('!.items.*', processItem)

46

.fail(handleNetworkError);

47

});

48

49

// HTTPS with custom options

50

const https = require('https');

51

const options = {

52

hostname: 'api.example.com',

53

path: '/secure-data',

54

headers: { 'Authorization': 'Bearer token123' }

55

};

56

57

https.get(options, function(res) {

58

oboe(res)

59

.start(function(statusCode, headers) {

60

console.log('Response status:', statusCode);

61

})

62

.node('!.data.*', processSecureData);

63

});

64

```

65

66

### File Stream Processing

67

68

Process JSON files without loading them entirely into memory.

69

70

```javascript { .api }

71

/**

72

* Process JSON files as streams

73

* Best for large JSON files that exceed memory limits

74

*/

75

```

76

77

**File Processing Examples:**

78

79

```javascript

80

const fs = require('fs');

81

const path = require('path');

82

83

// Large JSON file processing

84

const filePath = path.join(__dirname, 'data', 'huge-dataset.json');

85

const fileStream = fs.createReadStream(filePath, { encoding: 'utf8' });

86

87

oboe(fileStream)

88

.node('!.transactions.*', function(transaction) {

89

// Process each transaction as it's parsed

90

if (transaction.amount > 1000) {

91

console.log('Large transaction:', transaction.id, transaction.amount);

92

}

93

})

94

.node('!.metadata', function(metadata) {

95

console.log('Dataset metadata:', metadata);

96

})

97

.done(function() {

98

console.log('File processing completed');

99

})

100

.fail(function(error) {

101

console.error('File processing error:', error);

102

});

103

104

// Multiple file processing

105

const files = ['data1.json', 'data2.json', 'data3.json'];

106

107

files.forEach(function(filename) {

108

const stream = fs.createReadStream(filename);

109

110

oboe(stream)

111

.node('!.items.*', function(item) {

112

console.log(`Item from ${filename}:`, item);

113

})

114

.fail(function(error) {

115

console.error(`Error processing ${filename}:`, error);

116

});

117

});

118

```

119

120

### HTTP Response Stream Processing

121

122

Process HTTP responses as they arrive, ideal for APIs that return large JSON responses.

123

124

```javascript { .api }

125

/**

126

* Process HTTP responses progressively

127

* Reduces memory usage and improves response time for large APIs

128

*/

129

```

130

131

**HTTP Stream Examples:**

132

133

```javascript

134

const http = require('http');

135

const https = require('https');

136

137

// Simple HTTP GET stream

138

http.get('http://api.example.com/large-dataset', function(response) {

139

oboe(response)

140

.start(function(statusCode, headers) {

141

console.log('Response started:', statusCode);

142

console.log('Content-Length:', headers['content-length']);

143

})

144

.node('!.results.*', function(result, path) {

145

console.log(`Result ${path[1]}:`, result.title);

146

})

147

.done(function(json) {

148

console.log('Total results:', json.results.length);

149

});

150

});

151

152

// HTTPS with request configuration

153

const requestOptions = {

154

hostname: 'api.example.com',

155

port: 443,

156

path: '/v2/search?q=node.js&limit=1000',

157

method: 'GET',

158

headers: {

159

'User-Agent': 'NodeJS Stream Client',

160

'Accept': 'application/json'

161

}

162

};

163

164

const request = https.request(requestOptions, function(response) {

165

console.log('Response status:', response.statusCode);

166

167

oboe(response)

168

.node('!.items.*', function(item) {

169

// Process search results as they arrive

170

console.log('Found:', item.name, item.url);

171

})

172

.node('!.pagination', function(pagination) {

173

console.log('Pagination info:', pagination);

174

});

175

});

176

177

request.on('error', function(error) {

178

console.error('Request error:', error);

179

});

180

181

request.end();

182

```

183

184

### POST Request Stream Processing

185

186

Handle streaming responses from POST requests with request bodies.

187

188

```javascript { .api }

189

/**

190

* Stream processing for POST requests

191

* Useful for search APIs and data submission endpoints

192

*/

193

```

194

195

**POST Stream Examples:**

196

197

```javascript

198

const http = require('http');

199

const querystring = require('querystring');

200

201

// POST with form data

202

const postData = querystring.stringify({

203

query: 'large dataset search',

204

format: 'json',

205

limit: 1000

206

});

207

208

const options = {

209

hostname: 'api.example.com',

210

port: 80,

211

path: '/search',

212

method: 'POST',

213

headers: {

214

'Content-Type': 'application/x-www-form-urlencoded',

215

'Content-Length': Buffer.byteLength(postData)

216

}

217

};

218

219

const request = http.request(options, function(response) {

220

oboe(response)

221

.node('!.results.*', function(result) {

222

console.log('Search result:', result.title);

223

})

224

.done(function(json) {

225

console.log('Search completed:', json.total_results, 'results');

226

});

227

});

228

229

request.write(postData);

230

request.end();

231

232

// POST with JSON body

233

const jsonData = JSON.stringify({

234

filters: { type: 'premium', active: true },

235

sort: 'created_date',

236

limit: 500

237

});

238

239

const jsonOptions = {

240

hostname: 'api.example.com',

241

path: '/filtered-data',

242

method: 'POST',

243

headers: {

244

'Content-Type': 'application/json',

245

'Content-Length': Buffer.byteLength(jsonData)

246

}

247

};

248

249

const jsonRequest = http.request(jsonOptions, function(response) {

250

oboe(response)

251

.node('!.data.*', processFilteredItem)

252

.fail(handleRequestError);

253

});

254

255

jsonRequest.write(jsonData);

256

jsonRequest.end();

257

```

258

259

### Stream Error Handling

260

261

Handle various types of stream-related errors.

262

263

```javascript { .api }

264

/**

265

* Comprehensive error handling for stream processing

266

* Covers network errors, file errors, and parsing errors

267

*/

268

```

269

270

**Error Handling Examples:**

271

272

```javascript

273

const fs = require('fs');

274

const http = require('http');

275

276

// File stream error handling

277

function processFileStream(filename) {

278

const stream = fs.createReadStream(filename);

279

280

// Handle file system errors

281

stream.on('error', function(error) {

282

console.error('File stream error:', error.message);

283

if (error.code === 'ENOENT') {

284

console.error('File not found:', filename);

285

} else if (error.code === 'EACCES') {

286

console.error('Permission denied:', filename);

287

}

288

});

289

290

oboe(stream)

291

.fail(function(error) {

292

if (error.thrown) {

293

console.error('JSON parsing error:', error.thrown.message);

294

} else {

295

console.error('Stream processing error:', error);

296

}

297

});

298

}

299

300

// HTTP stream error handling

301

function processHttpStream(url) {

302

http.get(url, function(response) {

303

if (response.statusCode !== 200) {

304

console.error('HTTP error:', response.statusCode, response.statusMessage);

305

return;

306

}

307

308

oboe(response)

309

.start(function(statusCode, headers) {

310

const contentType = headers['content-type'];

311

if (!contentType || !contentType.includes('application/json')) {

312

console.warn('Unexpected content type:', contentType);

313

}

314

})

315

.fail(function(error) {

316

console.error('Response processing error:', error);

317

});

318

}).on('error', function(error) {

319

console.error('Request error:', error.message);

320

});

321

}

322

```

323

324

### Stream Performance Optimization

325

326

Optimize stream processing for better performance and memory usage.

327

328

```javascript { .api }

329

/**

330

* Performance optimization techniques for stream processing

331

* Memory management and processing efficiency

332

*/

333

```

334

335

**Performance Examples:**

336

337

```javascript

338

const fs = require('fs');

339

340

// Optimized file stream with buffer control

341

function processLargeFile(filename) {

342

const stream = fs.createReadStream(filename, {

343

encoding: 'utf8',

344

highWaterMark: 16 * 1024 // 16KB chunks

345

});

346

347

let processedCount = 0;

348

349

oboe(stream)

350

.node('!.records.*', function(record) {

351

processedCount++;

352

353

// Process record immediately to avoid memory buildup

354

processRecord(record);

355

356

// Optional: Pause processing for backpressure control

357

if (processedCount % 1000 === 0) {

358

console.log('Processed', processedCount, 'records');

359

360

// Brief pause to allow garbage collection

361

setImmediate(() => {

362

// Continue processing

363

});

364

}

365

})

366

.done(function() {

367

console.log('Total processed:', processedCount, 'records');

368

});

369

}

370

371

// Memory-efficient batch processing

372

function batchProcessStream(stream, batchSize = 100) {

373

let batch = [];

374

375

oboe(stream)

376

.node('!.items.*', function(item) {

377

batch.push(item);

378

379

if (batch.length >= batchSize) {

380

processBatch(batch);

381

batch = []; // Clear batch to free memory

382

}

383

})

384

.done(function() {

385

if (batch.length > 0) {

386

processBatch(batch); // Process remaining items

387

}

388

});

389

}

390

391

function processBatch(items) {

392

console.log('Processing batch of', items.length, 'items');

393

// Batch processing logic here

394

}

395

396

function processRecord(record) {

397

// Individual record processing logic

398

console.log('Processing record:', record.id);

399

}

400

```

401

402

### Stream Monitoring and Debugging

403

404

Monitor stream processing progress and debug issues.

405

406

```javascript { .api }

407

/**

408

* Stream processing monitoring and debugging utilities

409

* Progress tracking and performance metrics

410

*/

411

```

412

413

**Monitoring Examples:**

414

415

```javascript

416

const fs = require('fs');

417

418

function monitoredStreamProcessing(filename) {

419

const stats = {

420

startTime: Date.now(),

421

bytesProcessed: 0,

422

itemsProcessed: 0,

423

errors: 0

424

};

425

426

const stream = fs.createReadStream(filename);

427

428

// Monitor stream progress

429

stream.on('data', function(chunk) {

430

stats.bytesProcessed += chunk.length;

431

});

432

433

stream.on('end', function() {

434

const duration = Date.now() - stats.startTime;

435

console.log('Stream completed in', duration, 'ms');

436

console.log('Bytes processed:', stats.bytesProcessed);

437

console.log('Items processed:', stats.itemsProcessed);

438

console.log('Processing rate:', Math.round(stats.itemsProcessed / (duration / 1000)), 'items/sec');

439

});

440

441

oboe(stream)

442

.node('!.data.*', function(item) {

443

stats.itemsProcessed++;

444

445

// Progress reporting

446

if (stats.itemsProcessed % 1000 === 0) {

447

const elapsed = Date.now() - stats.startTime;

448

const rate = Math.round(stats.itemsProcessed / (elapsed / 1000));

449

console.log(`Progress: ${stats.itemsProcessed} items (${rate} items/sec)`);

450

}

451

})

452

.fail(function(error) {

453

stats.errors++;

454

console.error('Processing error:', error);

455

});

456

}

457

```

458

459

## Node.js vs Browser Differences

460

461

### Node.js Specific Features:

462

- **ReadableStream processing**: File streams, HTTP response streams

463

- **File system integration**: Direct file processing via `fs.createReadStream()`

464

- **HTTP client integration**: Built-in `http` and `https` module support

465

- **Stream control**: Backpressure handling and flow control

466

- **Memory management**: Better control over memory usage with large datasets

467

468

### Browser Limitations:

469

- **No stream processing**: Browser version only supports HTTP URLs and manual content feeding

470

- **No file system access**: Cannot process local files directly

471

- **Limited HTTP control**: Relies on browser's fetch/XMLHttpRequest capabilities

472

473

## Stream Processing Best Practices

474

475

1. **Error Handling**: Always handle both stream errors and parsing errors

476

2. **Memory Management**: Process items immediately to avoid memory buildup

477

3. **Progress Monitoring**: Track processing progress for long-running operations

478

4. **Backpressure Control**: Use pausing and resuming for large datasets

479

5. **Resource Cleanup**: Properly close streams and clean up resources

480

6. **Performance Monitoring**: Monitor processing rates and optimize accordingly