0
# Stream Processing
1
2
Oboe.js provides Node.js-specific capabilities for processing readable streams, including file streams, HTTP response streams, and any other Node.js readable stream source. This enables server-side JSON processing from various sources without loading entire files into memory.
3
4
## Capabilities
5
6
### Stream Constructor
7
8
Create an oboe instance that processes a Node.js readable stream.
9
10
```javascript { .api }
11
/**
12
* Create oboe instance for processing a readable stream
13
* @param {ReadableStream} stream - Node.js readable stream
14
* @returns {OboeInstance} Configured oboe instance
15
*/
16
function oboe(stream: ReadableStream): OboeInstance;
17
```
18
19
**Stream Type Support:**
20
- File streams (`fs.createReadStream()`)
21
- HTTP/HTTPS response streams
22
- Transform streams
23
- Any Node.js readable stream with standard methods
24
25
**Usage Examples:**
26
27
```javascript
28
const oboe = require('oboe');
29
const fs = require('fs');
30
const http = require('http');
31
32
// File stream processing
33
const fileStream = fs.createReadStream('large-data.json');
34
oboe(fileStream)
35
.node('!.records.*', function(record) {
36
console.log('Processing record:', record.id);
37
})
38
.done(function(json) {
39
console.log('File processing complete');
40
});
41
42
// HTTP response stream
43
http.get('http://api.example.com/stream', function(res) {
44
oboe(res)
45
.node('!.items.*', processItem)
46
.fail(handleNetworkError);
47
});
48
49
// HTTPS with custom options
50
const https = require('https');
51
const options = {
52
hostname: 'api.example.com',
53
path: '/secure-data',
54
headers: { 'Authorization': 'Bearer token123' }
55
};
56
57
https.get(options, function(res) {
58
oboe(res)
59
.start(function(statusCode, headers) {
60
console.log('Response status:', statusCode);
61
})
62
.node('!.data.*', processSecureData);
63
});
64
```
65
66
### File Stream Processing
67
68
Process JSON files without loading them entirely into memory.
69
70
```javascript { .api }
71
/**
72
* Process JSON files as streams
73
* Best for large JSON files that exceed memory limits
74
*/
75
```
76
77
**File Processing Examples:**
78
79
```javascript
80
const fs = require('fs');
81
const path = require('path');
82
83
// Large JSON file processing
84
const filePath = path.join(__dirname, 'data', 'huge-dataset.json');
85
const fileStream = fs.createReadStream(filePath, { encoding: 'utf8' });
86
87
oboe(fileStream)
88
.node('!.transactions.*', function(transaction) {
89
// Process each transaction as it's parsed
90
if (transaction.amount > 1000) {
91
console.log('Large transaction:', transaction.id, transaction.amount);
92
}
93
})
94
.node('!.metadata', function(metadata) {
95
console.log('Dataset metadata:', metadata);
96
})
97
.done(function() {
98
console.log('File processing completed');
99
})
100
.fail(function(error) {
101
console.error('File processing error:', error);
102
});
103
104
// Multiple file processing
105
const files = ['data1.json', 'data2.json', 'data3.json'];
106
107
files.forEach(function(filename) {
108
const stream = fs.createReadStream(filename);
109
110
oboe(stream)
111
.node('!.items.*', function(item) {
112
console.log(`Item from ${filename}:`, item);
113
})
114
.fail(function(error) {
115
console.error(`Error processing ${filename}:`, error);
116
});
117
});
118
```
119
120
### HTTP Response Stream Processing
121
122
Process HTTP responses as they arrive, ideal for APIs that return large JSON responses.
123
124
```javascript { .api }
125
/**
126
* Process HTTP responses progressively
127
* Reduces memory usage and improves response time for large APIs
128
*/
129
```
130
131
**HTTP Stream Examples:**
132
133
```javascript
134
const http = require('http');
135
const https = require('https');
136
137
// Simple HTTP GET stream
138
http.get('http://api.example.com/large-dataset', function(response) {
139
oboe(response)
140
.start(function(statusCode, headers) {
141
console.log('Response started:', statusCode);
142
console.log('Content-Length:', headers['content-length']);
143
})
144
.node('!.results.*', function(result, path) {
145
console.log(`Result ${path[1]}:`, result.title);
146
})
147
.done(function(json) {
148
console.log('Total results:', json.results.length);
149
});
150
});
151
152
// HTTPS with request configuration
153
const requestOptions = {
154
hostname: 'api.example.com',
155
port: 443,
156
path: '/v2/search?q=node.js&limit=1000',
157
method: 'GET',
158
headers: {
159
'User-Agent': 'NodeJS Stream Client',
160
'Accept': 'application/json'
161
}
162
};
163
164
const request = https.request(requestOptions, function(response) {
165
console.log('Response status:', response.statusCode);
166
167
oboe(response)
168
.node('!.items.*', function(item) {
169
// Process search results as they arrive
170
console.log('Found:', item.name, item.url);
171
})
172
.node('!.pagination', function(pagination) {
173
console.log('Pagination info:', pagination);
174
});
175
});
176
177
request.on('error', function(error) {
178
console.error('Request error:', error);
179
});
180
181
request.end();
182
```
183
184
### POST Request Stream Processing
185
186
Handle streaming responses from POST requests with request bodies.
187
188
```javascript { .api }
189
/**
190
* Stream processing for POST requests
191
* Useful for search APIs and data submission endpoints
192
*/
193
```
194
195
**POST Stream Examples:**
196
197
```javascript
198
const http = require('http');
199
const querystring = require('querystring');
200
201
// POST with form data
202
const postData = querystring.stringify({
203
query: 'large dataset search',
204
format: 'json',
205
limit: 1000
206
});
207
208
const options = {
209
hostname: 'api.example.com',
210
port: 80,
211
path: '/search',
212
method: 'POST',
213
headers: {
214
'Content-Type': 'application/x-www-form-urlencoded',
215
'Content-Length': Buffer.byteLength(postData)
216
}
217
};
218
219
const request = http.request(options, function(response) {
220
oboe(response)
221
.node('!.results.*', function(result) {
222
console.log('Search result:', result.title);
223
})
224
.done(function(json) {
225
console.log('Search completed:', json.total_results, 'results');
226
});
227
});
228
229
request.write(postData);
230
request.end();
231
232
// POST with JSON body
233
const jsonData = JSON.stringify({
234
filters: { type: 'premium', active: true },
235
sort: 'created_date',
236
limit: 500
237
});
238
239
const jsonOptions = {
240
hostname: 'api.example.com',
241
path: '/filtered-data',
242
method: 'POST',
243
headers: {
244
'Content-Type': 'application/json',
245
'Content-Length': Buffer.byteLength(jsonData)
246
}
247
};
248
249
const jsonRequest = http.request(jsonOptions, function(response) {
250
oboe(response)
251
.node('!.data.*', processFilteredItem)
252
.fail(handleRequestError);
253
});
254
255
jsonRequest.write(jsonData);
256
jsonRequest.end();
257
```
258
259
### Stream Error Handling
260
261
Handle various types of stream-related errors.
262
263
```javascript { .api }
264
/**
265
* Comprehensive error handling for stream processing
266
* Covers network errors, file errors, and parsing errors
267
*/
268
```
269
270
**Error Handling Examples:**
271
272
```javascript
273
const fs = require('fs');
274
const http = require('http');
275
276
// File stream error handling
277
function processFileStream(filename) {
278
const stream = fs.createReadStream(filename);
279
280
// Handle file system errors
281
stream.on('error', function(error) {
282
console.error('File stream error:', error.message);
283
if (error.code === 'ENOENT') {
284
console.error('File not found:', filename);
285
} else if (error.code === 'EACCES') {
286
console.error('Permission denied:', filename);
287
}
288
});
289
290
oboe(stream)
291
.fail(function(error) {
292
if (error.thrown) {
293
console.error('JSON parsing error:', error.thrown.message);
294
} else {
295
console.error('Stream processing error:', error);
296
}
297
});
298
}
299
300
// HTTP stream error handling
301
function processHttpStream(url) {
302
http.get(url, function(response) {
303
if (response.statusCode !== 200) {
304
console.error('HTTP error:', response.statusCode, response.statusMessage);
305
return;
306
}
307
308
oboe(response)
309
.start(function(statusCode, headers) {
310
const contentType = headers['content-type'];
311
if (!contentType || !contentType.includes('application/json')) {
312
console.warn('Unexpected content type:', contentType);
313
}
314
})
315
.fail(function(error) {
316
console.error('Response processing error:', error);
317
});
318
}).on('error', function(error) {
319
console.error('Request error:', error.message);
320
});
321
}
322
```
323
324
### Stream Performance Optimization
325
326
Optimize stream processing for better performance and memory usage.
327
328
```javascript { .api }
329
/**
330
* Performance optimization techniques for stream processing
331
* Memory management and processing efficiency
332
*/
333
```
334
335
**Performance Examples:**
336
337
```javascript
338
const fs = require('fs');
339
340
// Optimized file stream with buffer control
341
function processLargeFile(filename) {
342
const stream = fs.createReadStream(filename, {
343
encoding: 'utf8',
344
highWaterMark: 16 * 1024 // 16KB chunks
345
});
346
347
let processedCount = 0;
348
349
oboe(stream)
350
.node('!.records.*', function(record) {
351
processedCount++;
352
353
// Process record immediately to avoid memory buildup
354
processRecord(record);
355
356
// Optional: Pause processing for backpressure control
357
if (processedCount % 1000 === 0) {
358
console.log('Processed', processedCount, 'records');
359
360
// Brief pause to allow garbage collection
361
setImmediate(() => {
362
// Continue processing
363
});
364
}
365
})
366
.done(function() {
367
console.log('Total processed:', processedCount, 'records');
368
});
369
}
370
371
// Memory-efficient batch processing
372
function batchProcessStream(stream, batchSize = 100) {
373
let batch = [];
374
375
oboe(stream)
376
.node('!.items.*', function(item) {
377
batch.push(item);
378
379
if (batch.length >= batchSize) {
380
processBatch(batch);
381
batch = []; // Clear batch to free memory
382
}
383
})
384
.done(function() {
385
if (batch.length > 0) {
386
processBatch(batch); // Process remaining items
387
}
388
});
389
}
390
391
function processBatch(items) {
392
console.log('Processing batch of', items.length, 'items');
393
// Batch processing logic here
394
}
395
396
function processRecord(record) {
397
// Individual record processing logic
398
console.log('Processing record:', record.id);
399
}
400
```
401
402
### Stream Monitoring and Debugging
403
404
Monitor stream processing progress and debug issues.
405
406
```javascript { .api }
407
/**
408
* Stream processing monitoring and debugging utilities
409
* Progress tracking and performance metrics
410
*/
411
```
412
413
**Monitoring Examples:**
414
415
```javascript
416
const fs = require('fs');
417
418
function monitoredStreamProcessing(filename) {
419
const stats = {
420
startTime: Date.now(),
421
bytesProcessed: 0,
422
itemsProcessed: 0,
423
errors: 0
424
};
425
426
const stream = fs.createReadStream(filename);
427
428
// Monitor stream progress
429
stream.on('data', function(chunk) {
430
stats.bytesProcessed += chunk.length;
431
});
432
433
stream.on('end', function() {
434
const duration = Date.now() - stats.startTime;
435
console.log('Stream completed in', duration, 'ms');
436
console.log('Bytes processed:', stats.bytesProcessed);
437
console.log('Items processed:', stats.itemsProcessed);
438
console.log('Processing rate:', Math.round(stats.itemsProcessed / (duration / 1000)), 'items/sec');
439
});
440
441
oboe(stream)
442
.node('!.data.*', function(item) {
443
stats.itemsProcessed++;
444
445
// Progress reporting
446
if (stats.itemsProcessed % 1000 === 0) {
447
const elapsed = Date.now() - stats.startTime;
448
const rate = Math.round(stats.itemsProcessed / (elapsed / 1000));
449
console.log(`Progress: ${stats.itemsProcessed} items (${rate} items/sec)`);
450
}
451
})
452
.fail(function(error) {
453
stats.errors++;
454
console.error('Processing error:', error);
455
});
456
}
457
```
458
459
## Node.js vs Browser Differences
460
461
### Node.js Specific Features:
462
- **ReadableStream processing**: File streams, HTTP response streams
463
- **File system integration**: Direct file processing via `fs.createReadStream()`
464
- **HTTP client integration**: Built-in `http` and `https` module support
465
- **Stream control**: Backpressure handling and flow control
466
- **Memory management**: Better control over memory usage with large datasets
467
468
### Browser Limitations:
469
- **No stream processing**: Browser version only supports HTTP URLs and manual content feeding
470
- **No file system access**: Cannot process local files directly
471
- **Limited HTTP control**: Relies on browser's fetch/XMLHttpRequest capabilities
472
473
## Stream Processing Best Practices
474
475
1. **Error Handling**: Always handle both stream errors and parsing errors
476
2. **Memory Management**: Process items immediately to avoid memory buildup
477
3. **Progress Monitoring**: Track processing progress for long-running operations
478
4. **Backpressure Control**: Use pausing and resuming for large datasets
479
5. **Resource Cleanup**: Properly close streams and clean up resources
480
6. **Performance Monitoring**: Monitor processing rates and optimize accordingly