CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-msgpackr

Ultra-fast MessagePack implementation with extensions for records and structured cloning

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

iterators.mddocs/

Iterator Support

Lazy evaluation support for processing large datasets efficiently using JavaScript iterators and async iterators, enabling memory-efficient handling of massive data collections.

Capabilities

packIter Function

Converts an iterable source of objects into an iterable of MessagePack buffers, supporting both synchronous and asynchronous iteration patterns.

/**
 * Creates an iterable that packs objects from the source iterable to MessagePack buffers
 * @param objectIterator - Source iterable/iterator (sync or async)
 * @param options - msgpackr pack options
 * @returns IterableIterator for sync sources, AsyncIterableIterator for async sources
 */
function packIter(
  objectIterator: Iterable | Iterator | AsyncIterable | AsyncIterator, 
  options?: Options
): IterableIterator<Buffer> | Promise<AsyncIterableIterator<Buffer>>;

Usage Examples:

import { packIter } from "msgpackr";

// Basic array iteration
const data = [
  { id: 1, name: "Alice" },
  { id: 2, name: "Bob" },
  { id: 3, name: "Charlie" }
];

for (const buffer of packIter(data)) {
  console.log('Packed buffer size:', buffer.length);
  // Send buffer over network, write to file, etc.
}

// Generator function source
function* generateUsers() {
  for (let i = 1; i <= 1000; i++) {
    yield { id: i, name: `User${i}`, created: new Date() };
  }
}

// Memory-efficient processing of large dataset
for (const buffer of packIter(generateUsers(), { useRecords: true })) {
  // Process each buffer individually - only one object in memory at a time
  await sendToServer(buffer);
}

// Set iteration
const userSet = new Set([
  { role: "admin", name: "Alice" },
  { role: "user", name: "Bob" }
]);

for (const buffer of packIter(userSet)) {
  console.log('Packed set item');
}

Async Iterator Support

import { packIter } from "msgpackr";

// Async generator source
async function* fetchUserPages() {
  let page = 1;
  while (page <= 10) {
    const response = await fetch(`/api/users?page=${page}`);
    const users = await response.json();
    for (const user of users) {
      yield user;
    }
    page++;
  }
}

// Process async iterator
const asyncIterable = packIter(fetchUserPages(), { 
  useRecords: true,
  structuredClone: true 
});

for await (const buffer of asyncIterable) {
  console.log('Processed async buffer:', buffer.length);
}

// Promise-based async iterator
const promiseIterator = Promise.resolve(someAsyncIterator);
const packedPromise = packIter(promiseIterator);

for await (const buffer of packedPromise) {
  // Handle packed buffers from promise-resolved iterator
}

unpackIter Function

Converts an iterable source of MessagePack buffers into an iterable of JavaScript objects, handling incomplete data and buffer boundaries automatically.

/**
 * Creates an iterable that unpacks MessagePack buffers to objects
 * @param bufferIterator - Source iterable/iterator of buffers (sync or async)
 * @param options - msgpackr unpack options
 * @returns IterableIterator for sync sources, AsyncIterableIterator for async sources
 */
function unpackIter(
  bufferIterator: Iterable | Iterator | AsyncIterable | AsyncIterator, 
  options?: Options
): IterableIterator<any> | Promise<AsyncIterableIterator<any>>;

Usage Examples:

import { unpackIter } from "msgpackr";

// Unpack from buffer array
const buffers = [buffer1, buffer2, buffer3]; // MessagePack buffers

for (const object of unpackIter(buffers)) {
  console.log('Unpacked object:', object);
}

// File reading with chunks
import { createReadStream } from "fs";

async function* readFileChunks(filename) {
  const stream = createReadStream(filename);
  for await (const chunk of stream) {
    yield chunk;
  }
}

// Process file chunks containing multiple MessagePack values
for await (const object of unpackIter(readFileChunks("data.msgpack"))) {
  // Each object is automatically unpacked from the stream
  console.log('Object from file:', object);
}

// Network data processing
async function* receiveNetworkData() {
  const socket = await connectToServer();
  for await (const chunk of socket) {
    yield Buffer.from(chunk);
  }
}

for await (const data of unpackIter(receiveNetworkData(), { useRecords: true })) {
  console.log('Received network data:', data);
}

Handling Incomplete Data

The unpackIter function automatically handles incomplete MessagePack data across buffer boundaries.

import { unpackIter } from "msgpackr";

// Generator that yields partial buffers
function* partialBuffers() {
  const fullBuffer = /* complete MessagePack buffer */;
  
  // Yield buffer in small chunks that may split MessagePack boundaries
  for (let i = 0; i < fullBuffer.length; i += 10) {
    yield fullBuffer.slice(i, i + 10);
  }
}

try {
  for (const object of unpackIter(partialBuffers())) {
    // Objects are correctly reconstructed even when 
    // MessagePack data spans multiple chunks
    console.log('Reconstructed object:', object);
  }
} catch (error) {
  if (error.incomplete) {
    console.log('Incomplete data at end:', error.values);
    console.log('Last position:', error.lastPosition);
  }
}

Encode/Decode Aliases

Alternative naming conventions for the iterator functions.

/**
 * Alias for packIter - encodes objects to MessagePack buffers
 */
const encodeIter: typeof packIter;

/**
 * Alias for unpackIter - decodes MessagePack buffers to objects
 */
const decodeIter: typeof unpackIter;

Usage Examples:

import { encodeIter, decodeIter } from "msgpackr";

// Using encode/decode terminology
const objects = [{ a: 1 }, { b: 2 }];
const buffers = [];

for (const buffer of encodeIter(objects)) {
  buffers.push(buffer);
}

for (const object of decodeIter(buffers)) {
  console.log('Decoded:', object);
}

Advanced Usage Patterns

Pipeline Processing

Combining iterators for complex data processing pipelines.

import { packIter, unpackIter } from "msgpackr";

// Transform pipeline: objects -> buffers -> objects
async function* transformPipeline(sourceData) {
  // Pack to buffers
  const packedIterator = packIter(sourceData, { useRecords: true });
  
  // Simulate network transmission or storage
  const networkBuffers = [];
  for (const buffer of packedIterator) {
    // Could send over network, save to disk, etc.
    networkBuffers.push(buffer);
  }
  
  // Unpack back to objects
  for (const object of unpackIter(networkBuffers, { useRecords: true })) {
    // Apply transformations
    yield {
      ...object,
      processed: true,
      timestamp: new Date()
    };
  }
}

// Use the pipeline
const sourceData = generateLargeDataset();
for await (const processedObject of transformPipeline(sourceData)) {
  console.log('Processed:', processedObject);
}

Memory-Efficient Batch Processing

import { packIter, unpackIter } from "msgpackr";

// Process large datasets in chunks without loading everything into memory
async function* batchProcessor(dataSource, batchSize = 100) {
  let batch = [];
  
  for await (const item of dataSource) {
    batch.push(item);
    
    if (batch.length >= batchSize) {
      // Pack the batch
      const packedBatch = [];
      for (const buffer of packIter(batch, { useRecords: true })) {
        packedBatch.push(buffer);
      }
      
      // Process and yield results
      for (const result of unpackIter(packedBatch, { useRecords: true })) {
        yield result;
      }
      
      batch = []; // Clear batch to free memory
    }
  }
  
  // Process remaining items
  if (batch.length > 0) {
    for (const buffer of packIter(batch, { useRecords: true })) {
      for (const result of unpackIter([buffer], { useRecords: true })) {
        yield result;
      }
    }
  }
}

Error Handling in Iterator Chains

import { packIter, unpackIter } from "msgpackr";

function* safeIteratorChain(source) {
  try {
    // Pack with error handling
    const packedIterator = packIter(source, { useRecords: true });
    const buffers = [];
    
    for (const buffer of packedIterator) {
      buffers.push(buffer);
    }
    
    // Unpack with error recovery
    for (const object of unpackIter(buffers, { useRecords: true })) {
      yield object;
    }
    
  } catch (error) {
    console.error('Iterator chain error:', error);
    
    if (error.incomplete) {
      // Handle incomplete data
      console.log('Recovered partial data:', error.values);
      for (const partialValue of error.values || []) {
        yield partialValue;
      }
    }
    
    throw error; // Re-throw if not recoverable
  }
}

Performance Considerations

  • Memory Efficiency: Iterators process one item at a time, keeping memory usage constant
  • Record Structures: Use useRecords: true for optimal performance with repeated structures
  • Async vs Sync: Choose appropriate iterator type based on data source characteristics
  • Buffer Sizes: Consider buffer sizes when dealing with network or file I/O
  • Error Recovery: Implement proper error handling for production use cases
// High-performance iterator configuration
const performantOptions = {
  useRecords: true,
  sequential: true,
  bundleStrings: true
};

for (const buffer of packIter(largeDataset, performantOptions)) {
  // Optimized processing
}

docs

advanced-classes.md

core-operations.md

extensions.md

index.md

iterators.md

streaming.md

tile.json