Stream-based encoding and decoding for handling large datasets, real-time data processing, and memory-efficient operations using Node.js streams.
Stream-based encoder for CBOR data that extends Node.js Transform stream.
/**
* Stream-based encoder for CBOR data
*/
class Encoder extends Transform {
/**
* Create encoder stream
* @param {EncodingOptions} [options] - Encoding options
*/
constructor(options);
/**
* Add custom semantic type encoder
* @param {string|Function} type - Type name or constructor function
* @param {EncodeFunction} func - Encoding function
*/
addSemanticType(type, func);
/**
* Encode any supported type
* @param {any} obj - Object to encode
* @returns {boolean} Success indicator
*/
pushAny(obj);
/**
* Clear loop detection state
*/
removeLoopDetectors();
/**
* Reset semantic types to defaults
*/
static reset();
/**
* Get/set semantic type mappings
* @type {SemanticMap}
*/
static SEMANTIC_TYPES;
}Usage Examples:
const cbor = require("cbor");
const fs = require("fs");
// Create encoder stream with options
const encoder = new cbor.Encoder({
canonical: true,
dateType: 'string',
detectLoops: true
});
// Pipe data through encoder
const readStream = fs.createReadStream('input.json');
const writeStream = fs.createWriteStream('output.cbor');
readStream
.pipe(new JSONParseStream()) // Custom transform to parse JSON
.pipe(encoder)
.pipe(writeStream);
// Manual encoding with pushAny
const encoder2 = new cbor.Encoder();
encoder2.on('data', (chunk) => {
console.log('Encoded chunk:', chunk);
});
encoder2.pushAny({ name: "Alice", age: 30 });
encoder2.pushAny([1, 2, 3, 4]);
encoder2.end();Stream-based decoder for CBOR data that extends Node.js Transform stream.
/**
* Stream-based decoder for CBOR data
*/
class Decoder extends Transform {
/**
* Create decoder stream
* @param {DecoderOptions} [options] - Decoding options
*/
constructor(options);
/**
* Stop processing and close the stream
*/
close();
/**
* Convert CBOR null/undefined symbols to actual values
* @param {any} val - Value to check and convert
* @returns {any} Converted value
*/
static nullcheck(val);
/**
* Is decoder currently running
* @type {boolean}
*/
running;
/**
* Maximum parsing depth
* @type {number}
*/
max_depth;
/**
* Tag number to function mappings
* @type {TagMap}
*/
tags;
/**
* Prefer Uint8Arrays over Buffers
* @type {boolean}
*/
preferWeb;
/**
* Emit extended result objects
* @type {boolean}
*/
extendedResults;
/**
* Error on no data
* @type {boolean}
*/
required;
/**
* Error on duplicate map keys
* @type {boolean}
*/
preventDuplicateKeys;
/**
* Bytes for decoded value
* @type {NoFilter}
*/
valueBytes;
}Usage Examples:
const cbor = require("cbor");
const fs = require("fs");
// Create decoder stream with options
const decoder = new cbor.Decoder({
max_depth: 64,
preferMap: true,
extendedResults: true
});
// Pipe CBOR data through decoder
const readStream = fs.createReadStream('data.cbor');
readStream
.pipe(decoder)
.on('data', (decoded) => {
console.log('Decoded object:', decoded);
})
.on('error', (err) => {
console.error('Decode error:', err);
});
// Stream processing with custom tag handlers
const decoderWithTags = new cbor.Decoder({
tags: {
// Handle custom tag 100
100: (value) => {
return new MyCustomClass(value);
},
// Handle date strings
0: (dateString) => {
return new Date(dateString);
}
}
});
// Manual stream control
const decoder2 = new cbor.Decoder();
decoder2.on('data', (obj) => {
console.log('Decoded:', obj);
// Stop after first object
decoder2.close();
});
// Write CBOR data to decoder
const encodedData = cbor.encode({ test: "data" });
decoder2.write(encodedData);Stream-based encoder that implements CBOR value sharing for object deduplication and smaller output sizes.
/**
* Stream-based encoder with value sharing support
*/
class SharedValueEncoder extends Encoder {
/**
* Create shared value encoder stream
* @param {EncodingOptions} [options] - Encoding options
*/
constructor(options);
/**
* Object recorder for tracking duplicate values
* @type {ObjectRecorder}
*/
valueSharing;
/**
* Stop recording object references and begin emitting sharing tags
*/
stopRecording();
/**
* Clear recorded objects and restart tracking
*/
clearRecording();
/**
* Encode objects with value sharing (static method)
* @param {...any} objs - Objects to encode
* @returns {Buffer} CBOR encoded data with sharing tags
*/
static encode(...objs);
/**
* Encode single object with value sharing (static method)
* @param {any} obj - Object to encode
* @param {EncodingOptions} [options] - Encoding options
* @returns {Buffer} CBOR encoded data with sharing tags
*/
static encodeOne(obj, options);
/**
* Async encode with value sharing (static method)
* @param {any} obj - Object to encode
* @param {EncodingOptions} [options] - Encoding options
* @returns {Promise<Buffer>} Promise resolving to CBOR encoded data
*/
static encodeAsync(obj, options);
/**
* Canonical encoding is not supported with value sharing
* @param {...any} objs - Objects to encode
* @throws {Error} Always throws - canonical encoding incompatible with value sharing
*/
static encodeCanonical(...objs);
}
/**
* Object recorder for tracking shared values
*/
class ObjectRecorder {
constructor();
/**
* Object tracking map
* @type {WeakMap}
*/
map;
/**
* Shared object counter
* @type {number}
*/
count;
/**
* Whether currently recording objects
* @type {boolean}
*/
recording;
/**
* Clear tracked objects and restart recording
*/
clear();
/**
* Stop recording mode
*/
stop();
/**
* Check if object should be tagged for sharing
* @param {object} obj - Object to check
* @returns {number} Sharing status code
*/
check(obj);
/**
* Never duplicated constant
* @type {number}
*/
static NEVER;
/**
* First occurrence constant
* @type {number}
*/
static FIRST;
}Usage Examples:
const cbor = require("cbor");
// Basic value sharing
const data = {
user: { name: "Alice", id: 123 },
permissions: ["read", "write"]
};
// Reuse the same objects multiple times
const complex = {
admin: data.user, // Will be shared
editor: data.user, // Reference to shared value
viewer: data.user, // Reference to shared value
adminPerms: data.permissions, // Will be shared
editorPerms: data.permissions // Reference to shared value
};
// Encode with value sharing (smaller output)
const sharedEncoded = cbor.SharedValueEncoder.encode(complex);
const regularEncoded = cbor.encode(complex);
console.log('Shared size:', sharedEncoded.length);
console.log('Regular size:', regularEncoded.length);
// Stream processing with value sharing
const sharedEncoder = new cbor.SharedValueEncoder();
const chunks = [];
sharedEncoder.on('data', (chunk) => {
chunks.push(chunk);
});
sharedEncoder.on('finish', () => {
const result = Buffer.concat(chunks);
console.log('Encoded with sharing:', result);
});
// First pass - record objects
sharedEncoder.pushAny(complex);
sharedEncoder.stopRecording(); // Switch to sharing mode
// Second pass - emit with sharing tags
sharedEncoder.pushAny(complex);
sharedEncoder.end();
// Manual control of recording
const encoder = new cbor.SharedValueEncoder();
encoder.pushAny({ a: 1 }); // First occurrence
encoder.pushAny({ a: 1 }); // Will be tagged for sharing
encoder.stopRecording(); // Stop recording phase
encoder.clearRecording(); // Clear for new encoding sessionAdd custom type encoding/decoding to streams for application-specific data types.
/**
* Semantic type encoder function for use with addSemanticType
* @callback EncodeFunction
* @param {Encoder} enc - The encoder instance
* @param {any} val - The value to encode
* @returns {boolean} True on success
*/Usage Examples:
const cbor = require("cbor");
// Define custom class
class Person {
constructor(name, age) {
this.name = name;
this.age = age;
}
toJSON() {
return { name: this.name, age: this.age };
}
}
// Create encoder with custom semantic type
const encoder = new cbor.Encoder();
// Add custom type encoder
encoder.addSemanticType(Person, (enc, person) => {
// Encode as tagged value (tag 1000)
enc.pushAny(new cbor.Tagged(1000, {
name: person.name,
age: person.age
}));
return true;
});
// Create decoder with custom tag handler
const decoder = new cbor.Decoder({
tags: {
1000: (obj) => new Person(obj.name, obj.age)
}
});
// Stream processing
encoder.pipe(decoder);
decoder.on('data', (obj) => {
console.log('Decoded person:', obj instanceof Person); // true
console.log('Name:', obj.name, 'Age:', obj.age);
});
// Encode custom objects
encoder.write(new Person("Alice", 30));
encoder.write(new Person("Bob", 25));
encoder.end();Handle errors in streaming operations with proper error propagation.
Usage Examples:
const cbor = require("cbor");
const { pipeline } = require("stream");
// Error handling in streams
const encoder = new cbor.Encoder({ detectLoops: true });
const decoder = new cbor.Decoder({ max_depth: 10 });
encoder.on('error', (err) => {
console.error('Encoder error:', err);
});
decoder.on('error', (err) => {
console.error('Decoder error:', err);
});
// Using pipeline for automatic error handling
pipeline(
sourceStream,
encoder,
decoder,
destinationStream,
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline completed successfully');
}
}
);
// Graceful shutdown
process.on('SIGINT', () => {
encoder.close();
decoder.close();
});Stream processing provides memory-efficient handling of large datasets.
Usage Examples:
const cbor = require("cbor");
// Process large datasets without loading everything into memory
function processLargeDataset(inputFile, outputFile) {
const encoder = new cbor.Encoder({
canonical: true,
omitUndefinedProperties: true
});
const decoder = new cbor.Decoder({
max_depth: 32,
preferWeb: false
});
let processedCount = 0;
decoder.on('data', (obj) => {
// Process each object as it's decoded
const processed = transformObject(obj);
encoder.write(processed);
processedCount++;
if (processedCount % 1000 === 0) {
console.log(`Processed ${processedCount} objects`);
}
});
// Set up file streams
fs.createReadStream(inputFile)
.pipe(decoder);
encoder
.pipe(fs.createWriteStream(outputFile));
}
function transformObject(obj) {
// Transform logic here
return {
...obj,
processed_at: new Date(),
version: '2.0'
};
}