Progressive JSON streaming parser that enables processing data as it arrives over HTTP without waiting for the complete response
94
Oboe.js provides Node.js-specific capabilities for processing readable streams, including file streams, HTTP response streams, and any other Node.js readable stream source. This enables server-side JSON processing from various sources without loading entire files into memory.
Create an oboe instance that processes a Node.js readable stream.
/**
* Create oboe instance for processing a readable stream
* @param {ReadableStream} stream - Node.js readable stream
* @returns {OboeInstance} Configured oboe instance
*/
function oboe(stream: ReadableStream): OboeInstance;Stream Type Support:
fs.createReadStream())Usage Examples:
const oboe = require('oboe');
const fs = require('fs');
const http = require('http');
// File stream processing
const fileStream = fs.createReadStream('large-data.json');
oboe(fileStream)
.node('!.records.*', function(record) {
console.log('Processing record:', record.id);
})
.done(function(json) {
console.log('File processing complete');
});
// HTTP response stream
http.get('http://api.example.com/stream', function(res) {
oboe(res)
.node('!.items.*', processItem)
.fail(handleNetworkError);
});
// HTTPS with custom options
const https = require('https');
const options = {
hostname: 'api.example.com',
path: '/secure-data',
headers: { 'Authorization': 'Bearer token123' }
};
https.get(options, function(res) {
oboe(res)
.start(function(statusCode, headers) {
console.log('Response status:', statusCode);
})
.node('!.data.*', processSecureData);
});Process JSON files without loading them entirely into memory.
/**
* Process JSON files as streams
* Best for large JSON files that exceed memory limits
*/File Processing Examples:
const fs = require('fs');
const path = require('path');
// Large JSON file processing
const filePath = path.join(__dirname, 'data', 'huge-dataset.json');
const fileStream = fs.createReadStream(filePath, { encoding: 'utf8' });
oboe(fileStream)
.node('!.transactions.*', function(transaction) {
// Process each transaction as it's parsed
if (transaction.amount > 1000) {
console.log('Large transaction:', transaction.id, transaction.amount);
}
})
.node('!.metadata', function(metadata) {
console.log('Dataset metadata:', metadata);
})
.done(function() {
console.log('File processing completed');
})
.fail(function(error) {
console.error('File processing error:', error);
});
// Multiple file processing
const files = ['data1.json', 'data2.json', 'data3.json'];
files.forEach(function(filename) {
const stream = fs.createReadStream(filename);
oboe(stream)
.node('!.items.*', function(item) {
console.log(`Item from ${filename}:`, item);
})
.fail(function(error) {
console.error(`Error processing ${filename}:`, error);
});
});Process HTTP responses as they arrive, ideal for APIs that return large JSON responses.
/**
* Process HTTP responses progressively
* Reduces memory usage and improves response time for large APIs
*/HTTP Stream Examples:
const http = require('http');
const https = require('https');
// Simple HTTP GET stream
http.get('http://api.example.com/large-dataset', function(response) {
oboe(response)
.start(function(statusCode, headers) {
console.log('Response started:', statusCode);
console.log('Content-Length:', headers['content-length']);
})
.node('!.results.*', function(result, path) {
console.log(`Result ${path[1]}:`, result.title);
})
.done(function(json) {
console.log('Total results:', json.results.length);
});
});
// HTTPS with request configuration
const requestOptions = {
hostname: 'api.example.com',
port: 443,
path: '/v2/search?q=node.js&limit=1000',
method: 'GET',
headers: {
'User-Agent': 'NodeJS Stream Client',
'Accept': 'application/json'
}
};
const request = https.request(requestOptions, function(response) {
console.log('Response status:', response.statusCode);
oboe(response)
.node('!.items.*', function(item) {
// Process search results as they arrive
console.log('Found:', item.name, item.url);
})
.node('!.pagination', function(pagination) {
console.log('Pagination info:', pagination);
});
});
request.on('error', function(error) {
console.error('Request error:', error);
});
request.end();Handle streaming responses from POST requests with request bodies.
/**
* Stream processing for POST requests
* Useful for search APIs and data submission endpoints
*/POST Stream Examples:
const http = require('http');
const querystring = require('querystring');
// POST with form data
const postData = querystring.stringify({
query: 'large dataset search',
format: 'json',
limit: 1000
});
const options = {
hostname: 'api.example.com',
port: 80,
path: '/search',
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'Content-Length': Buffer.byteLength(postData)
}
};
const request = http.request(options, function(response) {
oboe(response)
.node('!.results.*', function(result) {
console.log('Search result:', result.title);
})
.done(function(json) {
console.log('Search completed:', json.total_results, 'results');
});
});
request.write(postData);
request.end();
// POST with JSON body
const jsonData = JSON.stringify({
filters: { type: 'premium', active: true },
sort: 'created_date',
limit: 500
});
const jsonOptions = {
hostname: 'api.example.com',
path: '/filtered-data',
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(jsonData)
}
};
const jsonRequest = http.request(jsonOptions, function(response) {
oboe(response)
.node('!.data.*', processFilteredItem)
.fail(handleRequestError);
});
jsonRequest.write(jsonData);
jsonRequest.end();Handle various types of stream-related errors.
/**
* Comprehensive error handling for stream processing
* Covers network errors, file errors, and parsing errors
*/Error Handling Examples:
const fs = require('fs');
const http = require('http');
// File stream error handling
function processFileStream(filename) {
const stream = fs.createReadStream(filename);
// Handle file system errors
stream.on('error', function(error) {
console.error('File stream error:', error.message);
if (error.code === 'ENOENT') {
console.error('File not found:', filename);
} else if (error.code === 'EACCES') {
console.error('Permission denied:', filename);
}
});
oboe(stream)
.fail(function(error) {
if (error.thrown) {
console.error('JSON parsing error:', error.thrown.message);
} else {
console.error('Stream processing error:', error);
}
});
}
// HTTP stream error handling
function processHttpStream(url) {
http.get(url, function(response) {
if (response.statusCode !== 200) {
console.error('HTTP error:', response.statusCode, response.statusMessage);
return;
}
oboe(response)
.start(function(statusCode, headers) {
const contentType = headers['content-type'];
if (!contentType || !contentType.includes('application/json')) {
console.warn('Unexpected content type:', contentType);
}
})
.fail(function(error) {
console.error('Response processing error:', error);
});
}).on('error', function(error) {
console.error('Request error:', error.message);
});
}Optimize stream processing for better performance and memory usage.
/**
* Performance optimization techniques for stream processing
* Memory management and processing efficiency
*/Performance Examples:
const fs = require('fs');
// Optimized file stream with buffer control
function processLargeFile(filename) {
const stream = fs.createReadStream(filename, {
encoding: 'utf8',
highWaterMark: 16 * 1024 // 16KB chunks
});
let processedCount = 0;
oboe(stream)
.node('!.records.*', function(record) {
processedCount++;
// Process record immediately to avoid memory buildup
processRecord(record);
// Optional: Pause processing for backpressure control
if (processedCount % 1000 === 0) {
console.log('Processed', processedCount, 'records');
// Brief pause to allow garbage collection
setImmediate(() => {
// Continue processing
});
}
})
.done(function() {
console.log('Total processed:', processedCount, 'records');
});
}
// Memory-efficient batch processing
function batchProcessStream(stream, batchSize = 100) {
let batch = [];
oboe(stream)
.node('!.items.*', function(item) {
batch.push(item);
if (batch.length >= batchSize) {
processBatch(batch);
batch = []; // Clear batch to free memory
}
})
.done(function() {
if (batch.length > 0) {
processBatch(batch); // Process remaining items
}
});
}
function processBatch(items) {
console.log('Processing batch of', items.length, 'items');
// Batch processing logic here
}
function processRecord(record) {
// Individual record processing logic
console.log('Processing record:', record.id);
}Monitor stream processing progress and debug issues.
/**
* Stream processing monitoring and debugging utilities
* Progress tracking and performance metrics
*/Monitoring Examples:
const fs = require('fs');
function monitoredStreamProcessing(filename) {
const stats = {
startTime: Date.now(),
bytesProcessed: 0,
itemsProcessed: 0,
errors: 0
};
const stream = fs.createReadStream(filename);
// Monitor stream progress
stream.on('data', function(chunk) {
stats.bytesProcessed += chunk.length;
});
stream.on('end', function() {
const duration = Date.now() - stats.startTime;
console.log('Stream completed in', duration, 'ms');
console.log('Bytes processed:', stats.bytesProcessed);
console.log('Items processed:', stats.itemsProcessed);
console.log('Processing rate:', Math.round(stats.itemsProcessed / (duration / 1000)), 'items/sec');
});
oboe(stream)
.node('!.data.*', function(item) {
stats.itemsProcessed++;
// Progress reporting
if (stats.itemsProcessed % 1000 === 0) {
const elapsed = Date.now() - stats.startTime;
const rate = Math.round(stats.itemsProcessed / (elapsed / 1000));
console.log(`Progress: ${stats.itemsProcessed} items (${rate} items/sec)`);
}
})
.fail(function(error) {
stats.errors++;
console.error('Processing error:', error);
});
}fs.createReadStream()http and https module supportInstall with Tessl CLI
npx tessl i tessl/npm-oboeevals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10