CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-oboe

Progressive JSON streaming parser that enables processing data as it arrives over HTTP without waiting for the complete response

94

1.11x
Overview
Eval results
Files

stream-processing.mddocs/

Stream Processing

Oboe.js provides Node.js-specific capabilities for processing readable streams, including file streams, HTTP response streams, and any other Node.js readable stream source. This enables server-side JSON processing from various sources without loading entire files into memory.

Capabilities

Stream Constructor

Create an oboe instance that processes a Node.js readable stream.

/**
 * Create oboe instance for processing a readable stream
 * @param {ReadableStream} stream - Node.js readable stream
 * @returns {OboeInstance} Configured oboe instance
 */
function oboe(stream: ReadableStream): OboeInstance;

Stream Type Support:

  • File streams (fs.createReadStream())
  • HTTP/HTTPS response streams
  • Transform streams
  • Any Node.js readable stream with standard methods

Usage Examples:

const oboe = require('oboe');
const fs = require('fs');
const http = require('http');

// File stream processing
const fileStream = fs.createReadStream('large-data.json');
oboe(fileStream)
  .node('!.records.*', function(record) {
    console.log('Processing record:', record.id);
  })
  .done(function(json) {
    console.log('File processing complete');
  });

// HTTP response stream
http.get('http://api.example.com/stream', function(res) {
  oboe(res)
    .node('!.items.*', processItem)
    .fail(handleNetworkError);
});

// HTTPS with custom options
const https = require('https');
const options = {
  hostname: 'api.example.com',
  path: '/secure-data',
  headers: { 'Authorization': 'Bearer token123' }
};

https.get(options, function(res) {
  oboe(res)
    .start(function(statusCode, headers) {
      console.log('Response status:', statusCode);
    })
    .node('!.data.*', processSecureData);
});

File Stream Processing

Process JSON files without loading them entirely into memory.

/**
 * Process JSON files as streams
 * Best for large JSON files that exceed memory limits
 */

File Processing Examples:

const fs = require('fs');
const path = require('path');

// Large JSON file processing
const filePath = path.join(__dirname, 'data', 'huge-dataset.json');
const fileStream = fs.createReadStream(filePath, { encoding: 'utf8' });

oboe(fileStream)
  .node('!.transactions.*', function(transaction) {
    // Process each transaction as it's parsed
    if (transaction.amount > 1000) {
      console.log('Large transaction:', transaction.id, transaction.amount);
    }
  })
  .node('!.metadata', function(metadata) {
    console.log('Dataset metadata:', metadata);
  })
  .done(function() {
    console.log('File processing completed');
  })
  .fail(function(error) {
    console.error('File processing error:', error);
  });

// Multiple file processing
const files = ['data1.json', 'data2.json', 'data3.json'];

files.forEach(function(filename) {
  const stream = fs.createReadStream(filename);
  
  oboe(stream)
    .node('!.items.*', function(item) {
      console.log(`Item from ${filename}:`, item);
    })
    .fail(function(error) {
      console.error(`Error processing ${filename}:`, error);
    });
});

HTTP Response Stream Processing

Process HTTP responses as they arrive, ideal for APIs that return large JSON responses.

/**
 * Process HTTP responses progressively
 * Reduces memory usage and improves response time for large APIs
 */

HTTP Stream Examples:

const http = require('http');
const https = require('https');

// Simple HTTP GET stream
http.get('http://api.example.com/large-dataset', function(response) {
  oboe(response)
    .start(function(statusCode, headers) {
      console.log('Response started:', statusCode);
      console.log('Content-Length:', headers['content-length']);
    })
    .node('!.results.*', function(result, path) {
      console.log(`Result ${path[1]}:`, result.title);
    })
    .done(function(json) {
      console.log('Total results:', json.results.length);
    });
});

// HTTPS with request configuration
const requestOptions = {
  hostname: 'api.example.com',
  port: 443,
  path: '/v2/search?q=node.js&limit=1000',
  method: 'GET',
  headers: {
    'User-Agent': 'NodeJS Stream Client',
    'Accept': 'application/json'
  }
};

const request = https.request(requestOptions, function(response) {
  console.log('Response status:', response.statusCode);
  
  oboe(response)
    .node('!.items.*', function(item) {
      // Process search results as they arrive
      console.log('Found:', item.name, item.url);
    })
    .node('!.pagination', function(pagination) {
      console.log('Pagination info:', pagination);
    });
});

request.on('error', function(error) {
  console.error('Request error:', error);
});

request.end();

POST Request Stream Processing

Handle streaming responses from POST requests with request bodies.

/**
 * Stream processing for POST requests
 * Useful for search APIs and data submission endpoints
 */

POST Stream Examples:

const http = require('http');
const querystring = require('querystring');

// POST with form data
const postData = querystring.stringify({
  query: 'large dataset search',
  format: 'json',
  limit: 1000
});

const options = {
  hostname: 'api.example.com',
  port: 80,
  path: '/search',
  method: 'POST',
  headers: {
    'Content-Type': 'application/x-www-form-urlencoded',
    'Content-Length': Buffer.byteLength(postData)
  }
};

const request = http.request(options, function(response) {
  oboe(response)
    .node('!.results.*', function(result) {
      console.log('Search result:', result.title);
    })
    .done(function(json) {
      console.log('Search completed:', json.total_results, 'results');
    });
});

request.write(postData);
request.end();

// POST with JSON body
const jsonData = JSON.stringify({
  filters: { type: 'premium', active: true },
  sort: 'created_date',
  limit: 500
});

const jsonOptions = {
  hostname: 'api.example.com',
  path: '/filtered-data',
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    'Content-Length': Buffer.byteLength(jsonData)
  }
};

const jsonRequest = http.request(jsonOptions, function(response) {
  oboe(response)
    .node('!.data.*', processFilteredItem)
    .fail(handleRequestError);
});

jsonRequest.write(jsonData);
jsonRequest.end();

Stream Error Handling

Handle various types of stream-related errors.

/**
 * Comprehensive error handling for stream processing
 * Covers network errors, file errors, and parsing errors
 */

Error Handling Examples:

const fs = require('fs');
const http = require('http');

// File stream error handling
function processFileStream(filename) {
  const stream = fs.createReadStream(filename);
  
  // Handle file system errors
  stream.on('error', function(error) {
    console.error('File stream error:', error.message);
    if (error.code === 'ENOENT') {
      console.error('File not found:', filename);
    } else if (error.code === 'EACCES') {
      console.error('Permission denied:', filename);
    }
  });
  
  oboe(stream)
    .fail(function(error) {
      if (error.thrown) {
        console.error('JSON parsing error:', error.thrown.message);
      } else {
        console.error('Stream processing error:', error);
      }
    });
}

// HTTP stream error handling
function processHttpStream(url) {
  http.get(url, function(response) {
    if (response.statusCode !== 200) {
      console.error('HTTP error:', response.statusCode, response.statusMessage);
      return;
    }
    
    oboe(response)
      .start(function(statusCode, headers) {
        const contentType = headers['content-type'];
        if (!contentType || !contentType.includes('application/json')) {
          console.warn('Unexpected content type:', contentType);
        }
      })
      .fail(function(error) {
        console.error('Response processing error:', error);
      });
  }).on('error', function(error) {
    console.error('Request error:', error.message);
  });
}

Stream Performance Optimization

Optimize stream processing for better performance and memory usage.

/**
 * Performance optimization techniques for stream processing
 * Memory management and processing efficiency
 */

Performance Examples:

const fs = require('fs');

// Optimized file stream with buffer control
function processLargeFile(filename) {
  const stream = fs.createReadStream(filename, {
    encoding: 'utf8',
    highWaterMark: 16 * 1024 // 16KB chunks
  });
  
  let processedCount = 0;
  
  oboe(stream)
    .node('!.records.*', function(record) {
      processedCount++;
      
      // Process record immediately to avoid memory buildup
      processRecord(record);
      
      // Optional: Pause processing for backpressure control
      if (processedCount % 1000 === 0) {
        console.log('Processed', processedCount, 'records');
        
        // Brief pause to allow garbage collection
        setImmediate(() => {
          // Continue processing
        });
      }
    })
    .done(function() {
      console.log('Total processed:', processedCount, 'records');
    });
}

// Memory-efficient batch processing
function batchProcessStream(stream, batchSize = 100) {
  let batch = [];
  
  oboe(stream)
    .node('!.items.*', function(item) {
      batch.push(item);
      
      if (batch.length >= batchSize) {
        processBatch(batch);
        batch = []; // Clear batch to free memory
      }
    })
    .done(function() {
      if (batch.length > 0) {
        processBatch(batch); // Process remaining items
      }
    });
}

function processBatch(items) {
  console.log('Processing batch of', items.length, 'items');
  // Batch processing logic here
}

function processRecord(record) {
  // Individual record processing logic
  console.log('Processing record:', record.id);
}

Stream Monitoring and Debugging

Monitor stream processing progress and debug issues.

/**
 * Stream processing monitoring and debugging utilities
 * Progress tracking and performance metrics
 */

Monitoring Examples:

const fs = require('fs');

function monitoredStreamProcessing(filename) {
  const stats = {
    startTime: Date.now(),
    bytesProcessed: 0,
    itemsProcessed: 0,
    errors: 0
  };
  
  const stream = fs.createReadStream(filename);
  
  // Monitor stream progress
  stream.on('data', function(chunk) {
    stats.bytesProcessed += chunk.length;
  });
  
  stream.on('end', function() {
    const duration = Date.now() - stats.startTime;
    console.log('Stream completed in', duration, 'ms');
    console.log('Bytes processed:', stats.bytesProcessed);
    console.log('Items processed:', stats.itemsProcessed);
    console.log('Processing rate:', Math.round(stats.itemsProcessed / (duration / 1000)), 'items/sec');
  });
  
  oboe(stream)
    .node('!.data.*', function(item) {
      stats.itemsProcessed++;
      
      // Progress reporting
      if (stats.itemsProcessed % 1000 === 0) {
        const elapsed = Date.now() - stats.startTime;
        const rate = Math.round(stats.itemsProcessed / (elapsed / 1000));
        console.log(`Progress: ${stats.itemsProcessed} items (${rate} items/sec)`);
      }
    })
    .fail(function(error) {
      stats.errors++;
      console.error('Processing error:', error);
    });
}

Node.js vs Browser Differences

Node.js Specific Features:

  • ReadableStream processing: File streams, HTTP response streams
  • File system integration: Direct file processing via fs.createReadStream()
  • HTTP client integration: Built-in http and https module support
  • Stream control: Backpressure handling and flow control
  • Memory management: Better control over memory usage with large datasets

Browser Limitations:

  • No stream processing: Browser version only supports HTTP URLs and manual content feeding
  • No file system access: Cannot process local files directly
  • Limited HTTP control: Relies on browser's fetch/XMLHttpRequest capabilities

Stream Processing Best Practices

  1. Error Handling: Always handle both stream errors and parsing errors
  2. Memory Management: Process items immediately to avoid memory buildup
  3. Progress Monitoring: Track processing progress for long-running operations
  4. Backpressure Control: Use pausing and resuming for large datasets
  5. Resource Cleanup: Properly close streams and clean up resources
  6. Performance Monitoring: Monitor processing rates and optimize accordingly

Install with Tessl CLI

npx tessl i tessl/npm-oboe

docs

factory.md

index.md

instance-api.md

jsonpath-patterns.md

stream-processing.md

tile.json