or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

advanced-decoding.mdcbor-types.mddiagnostic-tools.mdencoding-decoding.mdindex.mdintegration-utilities.mdstream-processing.md
tile.json

stream-processing.mddocs/

Stream Processing

Stream-based encoding and decoding for handling large datasets, real-time data processing, and memory-efficient operations using Node.js streams.

Capabilities

Encoder Stream

Stream-based encoder for CBOR data that extends Node.js Transform stream.

/**
 * Stream-based encoder for CBOR data
 */
class Encoder extends Transform {
  /**
   * Create encoder stream
   * @param {EncodingOptions} [options] - Encoding options
   */
  constructor(options);
  
  /**
   * Add custom semantic type encoder
   * @param {string|Function} type - Type name or constructor function
   * @param {EncodeFunction} func - Encoding function
   */
  addSemanticType(type, func);
  
  /**
   * Encode any supported type
   * @param {any} obj - Object to encode
   * @returns {boolean} Success indicator
   */
  pushAny(obj);
  
  /**
   * Clear loop detection state
   */
  removeLoopDetectors();
  
  /**
   * Reset semantic types to defaults
   */
  static reset();
  
  /**
   * Get/set semantic type mappings
   * @type {SemanticMap}
   */
  static SEMANTIC_TYPES;
}

Usage Examples:

const cbor = require("cbor");
const fs = require("fs");

// Create encoder stream with options
const encoder = new cbor.Encoder({
  canonical: true,
  dateType: 'string',
  detectLoops: true
});

// Pipe data through encoder
const readStream = fs.createReadStream('input.json');
const writeStream = fs.createWriteStream('output.cbor');

readStream
  .pipe(new JSONParseStream()) // Custom transform to parse JSON
  .pipe(encoder)
  .pipe(writeStream);

// Manual encoding with pushAny
const encoder2 = new cbor.Encoder();
encoder2.on('data', (chunk) => {
  console.log('Encoded chunk:', chunk);
});

encoder2.pushAny({ name: "Alice", age: 30 });
encoder2.pushAny([1, 2, 3, 4]);
encoder2.end();

Decoder Stream

Stream-based decoder for CBOR data that extends Node.js Transform stream.

/**
 * Stream-based decoder for CBOR data
 */
class Decoder extends Transform {
  /**
   * Create decoder stream
   * @param {DecoderOptions} [options] - Decoding options
   */
  constructor(options);
  
  /**
   * Stop processing and close the stream
   */
  close();
  
  /**
   * Convert CBOR null/undefined symbols to actual values
   * @param {any} val - Value to check and convert
   * @returns {any} Converted value
   */
  static nullcheck(val);
  
  /**
   * Is decoder currently running
   * @type {boolean}
   */
  running;
  
  /**
   * Maximum parsing depth
   * @type {number}
   */
  max_depth;
  
  /**
   * Tag number to function mappings
   * @type {TagMap}
   */
  tags;
  
  /**
   * Prefer Uint8Arrays over Buffers
   * @type {boolean}
   */
  preferWeb;
  
  /**
   * Emit extended result objects
   * @type {boolean}
   */
  extendedResults;
  
  /**
   * Error on no data
   * @type {boolean}
   */
  required;
  
  /**
   * Error on duplicate map keys
   * @type {boolean}
   */
  preventDuplicateKeys;
  
  /**
   * Bytes for decoded value
   * @type {NoFilter}
   */
  valueBytes;
}

Usage Examples:

const cbor = require("cbor");
const fs = require("fs");

// Create decoder stream with options
const decoder = new cbor.Decoder({
  max_depth: 64,
  preferMap: true,
  extendedResults: true
});

// Pipe CBOR data through decoder
const readStream = fs.createReadStream('data.cbor');
readStream
  .pipe(decoder)
  .on('data', (decoded) => {
    console.log('Decoded object:', decoded);
  })
  .on('error', (err) => {
    console.error('Decode error:', err);
  });

// Stream processing with custom tag handlers
const decoderWithTags = new cbor.Decoder({
  tags: {
    // Handle custom tag 100
    100: (value) => {
      return new MyCustomClass(value);
    },
    // Handle date strings
    0: (dateString) => {
      return new Date(dateString);
    }
  }
});

// Manual stream control
const decoder2 = new cbor.Decoder();
decoder2.on('data', (obj) => {
  console.log('Decoded:', obj);
  
  // Stop after first object
  decoder2.close();
});

// Write CBOR data to decoder
const encodedData = cbor.encode({ test: "data" });
decoder2.write(encodedData);

SharedValueEncoder Stream

Stream-based encoder that implements CBOR value sharing for object deduplication and smaller output sizes.

/**
 * Stream-based encoder with value sharing support
 */
class SharedValueEncoder extends Encoder {
  /**
   * Create shared value encoder stream
   * @param {EncodingOptions} [options] - Encoding options
   */
  constructor(options);
  
  /**
   * Object recorder for tracking duplicate values
   * @type {ObjectRecorder}
   */
  valueSharing;
  
  /**
   * Stop recording object references and begin emitting sharing tags
   */
  stopRecording();
  
  /**
   * Clear recorded objects and restart tracking
   */
  clearRecording();
  
  /**
   * Encode objects with value sharing (static method)
   * @param {...any} objs - Objects to encode
   * @returns {Buffer} CBOR encoded data with sharing tags
   */
  static encode(...objs);
  
  /**
   * Encode single object with value sharing (static method)
   * @param {any} obj - Object to encode
   * @param {EncodingOptions} [options] - Encoding options
   * @returns {Buffer} CBOR encoded data with sharing tags
   */
  static encodeOne(obj, options);
  
  /**
   * Async encode with value sharing (static method)
   * @param {any} obj - Object to encode
   * @param {EncodingOptions} [options] - Encoding options
   * @returns {Promise<Buffer>} Promise resolving to CBOR encoded data
   */
  static encodeAsync(obj, options);
  
  /**
   * Canonical encoding is not supported with value sharing
   * @param {...any} objs - Objects to encode
   * @throws {Error} Always throws - canonical encoding incompatible with value sharing
   */
  static encodeCanonical(...objs);
}

/**
 * Object recorder for tracking shared values
 */
class ObjectRecorder {
  constructor();
  
  /**
   * Object tracking map
   * @type {WeakMap}
   */
  map;
  
  /**
   * Shared object counter
   * @type {number}
   */
  count;
  
  /**
   * Whether currently recording objects
   * @type {boolean}
   */
  recording;
  
  /**
   * Clear tracked objects and restart recording
   */
  clear();
  
  /**
   * Stop recording mode
   */
  stop();
  
  /**
   * Check if object should be tagged for sharing
   * @param {object} obj - Object to check
   * @returns {number} Sharing status code
   */
  check(obj);
  
  /**
   * Never duplicated constant
   * @type {number}
   */
  static NEVER;
  
  /**
   * First occurrence constant  
   * @type {number}
   */
  static FIRST;
}

Usage Examples:

const cbor = require("cbor");

// Basic value sharing
const data = {
  user: { name: "Alice", id: 123 },
  permissions: ["read", "write"]
};

// Reuse the same objects multiple times
const complex = {
  admin: data.user,      // Will be shared
  editor: data.user,     // Reference to shared value
  viewer: data.user,     // Reference to shared value
  adminPerms: data.permissions,  // Will be shared
  editorPerms: data.permissions  // Reference to shared value
};

// Encode with value sharing (smaller output)
const sharedEncoded = cbor.SharedValueEncoder.encode(complex);
const regularEncoded = cbor.encode(complex);

console.log('Shared size:', sharedEncoded.length);
console.log('Regular size:', regularEncoded.length);

// Stream processing with value sharing
const sharedEncoder = new cbor.SharedValueEncoder();
const chunks = [];

sharedEncoder.on('data', (chunk) => {
  chunks.push(chunk);
});

sharedEncoder.on('finish', () => {
  const result = Buffer.concat(chunks);
  console.log('Encoded with sharing:', result);
});

// First pass - record objects
sharedEncoder.pushAny(complex);
sharedEncoder.stopRecording(); // Switch to sharing mode

// Second pass - emit with sharing tags
sharedEncoder.pushAny(complex);
sharedEncoder.end();

// Manual control of recording
const encoder = new cbor.SharedValueEncoder();
encoder.pushAny({ a: 1 });     // First occurrence  
encoder.pushAny({ a: 1 });     // Will be tagged for sharing
encoder.stopRecording();       // Stop recording phase
encoder.clearRecording();      // Clear for new encoding session

Streaming with Custom Semantic Types

Add custom type encoding/decoding to streams for application-specific data types.

/**
 * Semantic type encoder function for use with addSemanticType
 * @callback EncodeFunction
 * @param {Encoder} enc - The encoder instance
 * @param {any} val - The value to encode
 * @returns {boolean} True on success
 */

Usage Examples:

const cbor = require("cbor");

// Define custom class
class Person {
  constructor(name, age) {
    this.name = name;
    this.age = age;
  }
  
  toJSON() {
    return { name: this.name, age: this.age };
  }
}

// Create encoder with custom semantic type
const encoder = new cbor.Encoder();

// Add custom type encoder
encoder.addSemanticType(Person, (enc, person) => {
  // Encode as tagged value (tag 1000)
  enc.pushAny(new cbor.Tagged(1000, {
    name: person.name,
    age: person.age
  }));
  return true;
});

// Create decoder with custom tag handler
const decoder = new cbor.Decoder({
  tags: {
    1000: (obj) => new Person(obj.name, obj.age)
  }
});

// Stream processing
encoder.pipe(decoder);

decoder.on('data', (obj) => {
  console.log('Decoded person:', obj instanceof Person); // true
  console.log('Name:', obj.name, 'Age:', obj.age);
});

// Encode custom objects
encoder.write(new Person("Alice", 30));
encoder.write(new Person("Bob", 25));
encoder.end();

Stream Error Handling

Handle errors in streaming operations with proper error propagation.

Usage Examples:

const cbor = require("cbor");
const { pipeline } = require("stream");

// Error handling in streams
const encoder = new cbor.Encoder({ detectLoops: true });
const decoder = new cbor.Decoder({ max_depth: 10 });

encoder.on('error', (err) => {
  console.error('Encoder error:', err);
});

decoder.on('error', (err) => {
  console.error('Decoder error:', err);
});

// Using pipeline for automatic error handling
pipeline(
  sourceStream,
  encoder,
  decoder,
  destinationStream,
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline completed successfully');
    }
  }
);

// Graceful shutdown
process.on('SIGINT', () => {
  encoder.close();
  decoder.close();
});

Memory Management

Stream processing provides memory-efficient handling of large datasets.

Usage Examples:

const cbor = require("cbor");

// Process large datasets without loading everything into memory
function processLargeDataset(inputFile, outputFile) {
  const encoder = new cbor.Encoder({
    canonical: true,
    omitUndefinedProperties: true
  });
  
  const decoder = new cbor.Decoder({
    max_depth: 32,
    preferWeb: false
  });
  
  let processedCount = 0;
  
  decoder.on('data', (obj) => {
    // Process each object as it's decoded
    const processed = transformObject(obj);
    encoder.write(processed);
    processedCount++;
    
    if (processedCount % 1000 === 0) {
      console.log(`Processed ${processedCount} objects`);
    }
  });
  
  // Set up file streams
  fs.createReadStream(inputFile)
    .pipe(decoder);
    
  encoder
    .pipe(fs.createWriteStream(outputFile));
}

function transformObject(obj) {
  // Transform logic here
  return {
    ...obj,
    processed_at: new Date(),
    version: '2.0'
  };
}