CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-cbor

Encode and parse data in the Concise Binary Object Representation (CBOR) data format (RFC8949).

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

stream-processing.mddocs/

Stream Processing

Stream-based encoding and decoding for handling large datasets, real-time data processing, and memory-efficient operations using Node.js streams.

Capabilities

Encoder Stream

Stream-based encoder for CBOR data that extends Node.js Transform stream.

/**
 * Stream-based encoder for CBOR data
 */
class Encoder extends Transform {
  /**
   * Create encoder stream
   * @param {EncodingOptions} [options] - Encoding options
   */
  constructor(options);
  
  /**
   * Add custom semantic type encoder
   * @param {string|Function} type - Type name or constructor function
   * @param {EncodeFunction} func - Encoding function
   */
  addSemanticType(type, func);
  
  /**
   * Encode any supported type
   * @param {any} obj - Object to encode
   * @returns {boolean} Success indicator
   */
  pushAny(obj);
  
  /**
   * Clear loop detection state
   */
  removeLoopDetectors();
  
  /**
   * Reset semantic types to defaults
   */
  static reset();
  
  /**
   * Get/set semantic type mappings
   * @type {SemanticMap}
   */
  static SEMANTIC_TYPES;
}

Usage Examples:

const cbor = require("cbor");
const fs = require("fs");

// Create encoder stream with options
const encoder = new cbor.Encoder({
  canonical: true,
  dateType: 'string',
  detectLoops: true
});

// Pipe data through encoder
const readStream = fs.createReadStream('input.json');
const writeStream = fs.createWriteStream('output.cbor');

readStream
  .pipe(new JSONParseStream()) // Custom transform to parse JSON
  .pipe(encoder)
  .pipe(writeStream);

// Manual encoding with pushAny
const encoder2 = new cbor.Encoder();
encoder2.on('data', (chunk) => {
  console.log('Encoded chunk:', chunk);
});

encoder2.pushAny({ name: "Alice", age: 30 });
encoder2.pushAny([1, 2, 3, 4]);
encoder2.end();

Decoder Stream

Stream-based decoder for CBOR data that extends Node.js Transform stream.

/**
 * Stream-based decoder for CBOR data
 */
class Decoder extends Transform {
  /**
   * Create decoder stream
   * @param {DecoderOptions} [options] - Decoding options
   */
  constructor(options);
  
  /**
   * Stop processing and close the stream
   */
  close();
  
  /**
   * Convert CBOR null/undefined symbols to actual values
   * @param {any} val - Value to check and convert
   * @returns {any} Converted value
   */
  static nullcheck(val);
  
  /**
   * Is decoder currently running
   * @type {boolean}
   */
  running;
  
  /**
   * Maximum parsing depth
   * @type {number}
   */
  max_depth;
  
  /**
   * Tag number to function mappings
   * @type {TagMap}
   */
  tags;
  
  /**
   * Prefer Uint8Arrays over Buffers
   * @type {boolean}
   */
  preferWeb;
  
  /**
   * Emit extended result objects
   * @type {boolean}
   */
  extendedResults;
  
  /**
   * Error on no data
   * @type {boolean}
   */
  required;
  
  /**
   * Error on duplicate map keys
   * @type {boolean}
   */
  preventDuplicateKeys;
  
  /**
   * Bytes for decoded value
   * @type {NoFilter}
   */
  valueBytes;
}

Usage Examples:

const cbor = require("cbor");
const fs = require("fs");

// Create decoder stream with options
const decoder = new cbor.Decoder({
  max_depth: 64,
  preferMap: true,
  extendedResults: true
});

// Pipe CBOR data through decoder
const readStream = fs.createReadStream('data.cbor');
readStream
  .pipe(decoder)
  .on('data', (decoded) => {
    console.log('Decoded object:', decoded);
  })
  .on('error', (err) => {
    console.error('Decode error:', err);
  });

// Stream processing with custom tag handlers
const decoderWithTags = new cbor.Decoder({
  tags: {
    // Handle custom tag 100
    100: (value) => {
      return new MyCustomClass(value);
    },
    // Handle date strings
    0: (dateString) => {
      return new Date(dateString);
    }
  }
});

// Manual stream control
const decoder2 = new cbor.Decoder();
decoder2.on('data', (obj) => {
  console.log('Decoded:', obj);
  
  // Stop after first object
  decoder2.close();
});

// Write CBOR data to decoder
const encodedData = cbor.encode({ test: "data" });
decoder2.write(encodedData);

SharedValueEncoder Stream

Stream-based encoder that implements CBOR value sharing for object deduplication and smaller output sizes.

/**
 * Stream-based encoder with value sharing support
 */
class SharedValueEncoder extends Encoder {
  /**
   * Create shared value encoder stream
   * @param {EncodingOptions} [options] - Encoding options
   */
  constructor(options);
  
  /**
   * Object recorder for tracking duplicate values
   * @type {ObjectRecorder}
   */
  valueSharing;
  
  /**
   * Stop recording object references and begin emitting sharing tags
   */
  stopRecording();
  
  /**
   * Clear recorded objects and restart tracking
   */
  clearRecording();
  
  /**
   * Encode objects with value sharing (static method)
   * @param {...any} objs - Objects to encode
   * @returns {Buffer} CBOR encoded data with sharing tags
   */
  static encode(...objs);
  
  /**
   * Encode single object with value sharing (static method)
   * @param {any} obj - Object to encode
   * @param {EncodingOptions} [options] - Encoding options
   * @returns {Buffer} CBOR encoded data with sharing tags
   */
  static encodeOne(obj, options);
  
  /**
   * Async encode with value sharing (static method)
   * @param {any} obj - Object to encode
   * @param {EncodingOptions} [options] - Encoding options
   * @returns {Promise<Buffer>} Promise resolving to CBOR encoded data
   */
  static encodeAsync(obj, options);
  
  /**
   * Canonical encoding is not supported with value sharing
   * @param {...any} objs - Objects to encode
   * @throws {Error} Always throws - canonical encoding incompatible with value sharing
   */
  static encodeCanonical(...objs);
}

/**
 * Object recorder for tracking shared values
 */
class ObjectRecorder {
  constructor();
  
  /**
   * Object tracking map
   * @type {WeakMap}
   */
  map;
  
  /**
   * Shared object counter
   * @type {number}
   */
  count;
  
  /**
   * Whether currently recording objects
   * @type {boolean}
   */
  recording;
  
  /**
   * Clear tracked objects and restart recording
   */
  clear();
  
  /**
   * Stop recording mode
   */
  stop();
  
  /**
   * Check if object should be tagged for sharing
   * @param {object} obj - Object to check
   * @returns {number} Sharing status code
   */
  check(obj);
  
  /**
   * Never duplicated constant
   * @type {number}
   */
  static NEVER;
  
  /**
   * First occurrence constant  
   * @type {number}
   */
  static FIRST;
}

Usage Examples:

const cbor = require("cbor");

// Basic value sharing
const data = {
  user: { name: "Alice", id: 123 },
  permissions: ["read", "write"]
};

// Reuse the same objects multiple times
const complex = {
  admin: data.user,      // Will be shared
  editor: data.user,     // Reference to shared value
  viewer: data.user,     // Reference to shared value
  adminPerms: data.permissions,  // Will be shared
  editorPerms: data.permissions  // Reference to shared value
};

// Encode with value sharing (smaller output)
const sharedEncoded = cbor.SharedValueEncoder.encode(complex);
const regularEncoded = cbor.encode(complex);

console.log('Shared size:', sharedEncoded.length);
console.log('Regular size:', regularEncoded.length);

// Stream processing with value sharing
const sharedEncoder = new cbor.SharedValueEncoder();
const chunks = [];

sharedEncoder.on('data', (chunk) => {
  chunks.push(chunk);
});

sharedEncoder.on('finish', () => {
  const result = Buffer.concat(chunks);
  console.log('Encoded with sharing:', result);
});

// First pass - record objects
sharedEncoder.pushAny(complex);
sharedEncoder.stopRecording(); // Switch to sharing mode

// Second pass - emit with sharing tags
sharedEncoder.pushAny(complex);
sharedEncoder.end();

// Manual control of recording
const encoder = new cbor.SharedValueEncoder();
encoder.pushAny({ a: 1 });     // First occurrence  
encoder.pushAny({ a: 1 });     // Will be tagged for sharing
encoder.stopRecording();       // Stop recording phase
encoder.clearRecording();      // Clear for new encoding session

Streaming with Custom Semantic Types

Add custom type encoding/decoding to streams for application-specific data types.

/**
 * Semantic type encoder function for use with addSemanticType
 * @callback EncodeFunction
 * @param {Encoder} enc - The encoder instance
 * @param {any} val - The value to encode
 * @returns {boolean} True on success
 */

Usage Examples:

const cbor = require("cbor");

// Define custom class
class Person {
  constructor(name, age) {
    this.name = name;
    this.age = age;
  }
  
  toJSON() {
    return { name: this.name, age: this.age };
  }
}

// Create encoder with custom semantic type
const encoder = new cbor.Encoder();

// Add custom type encoder
encoder.addSemanticType(Person, (enc, person) => {
  // Encode as tagged value (tag 1000)
  enc.pushAny(new cbor.Tagged(1000, {
    name: person.name,
    age: person.age
  }));
  return true;
});

// Create decoder with custom tag handler
const decoder = new cbor.Decoder({
  tags: {
    1000: (obj) => new Person(obj.name, obj.age)
  }
});

// Stream processing
encoder.pipe(decoder);

decoder.on('data', (obj) => {
  console.log('Decoded person:', obj instanceof Person); // true
  console.log('Name:', obj.name, 'Age:', obj.age);
});

// Encode custom objects
encoder.write(new Person("Alice", 30));
encoder.write(new Person("Bob", 25));
encoder.end();

Stream Error Handling

Handle errors in streaming operations with proper error propagation.

Usage Examples:

const cbor = require("cbor");
const { pipeline } = require("stream");

// Error handling in streams
const encoder = new cbor.Encoder({ detectLoops: true });
const decoder = new cbor.Decoder({ max_depth: 10 });

encoder.on('error', (err) => {
  console.error('Encoder error:', err);
});

decoder.on('error', (err) => {
  console.error('Decoder error:', err);
});

// Using pipeline for automatic error handling
pipeline(
  sourceStream,
  encoder,
  decoder,
  destinationStream,
  (err) => {
    if (err) {
      console.error('Pipeline failed:', err);
    } else {
      console.log('Pipeline completed successfully');
    }
  }
);

// Graceful shutdown
process.on('SIGINT', () => {
  encoder.close();
  decoder.close();
});

Memory Management

Stream processing provides memory-efficient handling of large datasets.

Usage Examples:

const cbor = require("cbor");

// Process large datasets without loading everything into memory
function processLargeDataset(inputFile, outputFile) {
  const encoder = new cbor.Encoder({
    canonical: true,
    omitUndefinedProperties: true
  });
  
  const decoder = new cbor.Decoder({
    max_depth: 32,
    preferWeb: false
  });
  
  let processedCount = 0;
  
  decoder.on('data', (obj) => {
    // Process each object as it's decoded
    const processed = transformObject(obj);
    encoder.write(processed);
    processedCount++;
    
    if (processedCount % 1000 === 0) {
      console.log(`Processed ${processedCount} objects`);
    }
  });
  
  // Set up file streams
  fs.createReadStream(inputFile)
    .pipe(decoder);
    
  encoder
    .pipe(fs.createWriteStream(outputFile));
}

function transformObject(obj) {
  // Transform logic here
  return {
    ...obj,
    processed_at: new Date(),
    version: '2.0'
  };
}

Install with Tessl CLI

npx tessl i tessl/npm-cbor

docs

advanced-decoding.md

cbor-types.md

diagnostic-tools.md

encoding-decoding.md

index.md

integration-utilities.md

stream-processing.md

tile.json