CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-msgpackr

Ultra-fast MessagePack implementation with extensions for records and structured cloning

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

streaming.mddocs/

Streaming

Transform streams for processing continuous data flows with automatic MessagePack encoding and decoding, optimized for Node.js stream-based applications.

Capabilities

PackrStream Class

Transform stream that converts JavaScript objects to MessagePack binary data, suitable for network transmission or file storage.

/**
 * Transform stream for packing objects to MessagePack binary format
 * Extends Node.js Transform stream with object mode input and binary output
 */
class PackrStream extends Transform {
  constructor(options?: Options | StreamOptions);
}

interface StreamOptions {
  highWaterMark?: number;
  emitClose?: boolean;
  allowHalfOpen?: boolean;
}

Usage Examples:

import { PackrStream } from "msgpackr";
import { createWriteStream } from "fs";

// Basic streaming to file
const packrStream = new PackrStream();
const fileStream = createWriteStream("data.msgpack");

packrStream.pipe(fileStream);

// Write objects to stream
packrStream.write({ id: 1, name: "Alice" });
packrStream.write({ id: 2, name: "Bob" });
packrStream.end();

// With custom options
const optimizedStream = new PackrStream({
  useRecords: true,
  sequential: true,
  highWaterMark: 16384
});

// Network streaming example
import { createServer } from "net";

const server = createServer((socket) => {
  const packrStream = new PackrStream({ useRecords: true });
  packrStream.pipe(socket);
  
  // Stream data to client
  const data = [
    { type: "user", id: 1, name: "Alice" },
    { type: "user", id: 2, name: "Bob" },
    { type: "message", text: "Hello World" }
  ];
  
  data.forEach(item => packrStream.write(item));
  packrStream.end();
});

UnpackrStream Class

Transform stream that converts MessagePack binary data back to JavaScript objects, handling incomplete data and stream boundaries automatically.

/**
 * Transform stream for unpacking MessagePack binary data to objects
 * Extends Node.js Transform stream with binary input and object mode output
 */
class UnpackrStream extends Transform {
  constructor(options?: Options | StreamOptions);
}

Usage Examples:

import { UnpackrStream } from "msgpackr";
import { createReadStream } from "fs";

// Basic streaming from file
const unpackrStream = new UnpackrStream();
const fileStream = createReadStream("data.msgpack");

fileStream.pipe(unpackrStream);

unpackrStream.on('data', (object) => {
  console.log('Received object:', object);
});

// With structured cloning support
const cloningStream = new UnpackrStream({
  structuredClone: true,
  mapsAsObjects: true
});

// Network client example
import { connect } from "net";

const client = connect(8080, () => {
  const unpackrStream = new UnpackrStream({ useRecords: true });
  
  client.pipe(unpackrStream);
  
  unpackrStream.on('data', (data) => {
    console.log('Received from server:', data);
  });
});

// Handle incomplete data gracefully
const robustStream = new UnpackrStream();

robustStream.on('error', (error) => {
  if (error.incomplete) {
    console.log('Incomplete data, will retry with more data');
  } else {
    console.error('Stream error:', error);
  }
});

Bidirectional Streaming

Combining PackrStream and UnpackrStream for full-duplex communication.

Usage Examples:

import { PackrStream, UnpackrStream } from "msgpackr";
import { connect } from "net";

// Client-side bidirectional streaming
const client = connect(8080, () => {
  const packrStream = new PackrStream({ useRecords: true });
  const unpackrStream = new UnpackrStream({ useRecords: true });
  
  // Setup bidirectional pipes
  packrStream.pipe(client);
  client.pipe(unpackrStream);
  
  // Send data to server
  packrStream.write({ command: "login", user: "alice" });
  packrStream.write({ command: "getData", id: 123 });
  
  // Receive responses
  unpackrStream.on('data', (response) => {
    console.log('Server response:', response);
  });
});

// Server-side echo example
import { createServer } from "net";

const server = createServer((socket) => {
  const packrStream = new PackrStream();
  const unpackrStream = new UnpackrStream();
  
  // Setup echo pipeline
  socket.pipe(unpackrStream);
  packrStream.pipe(socket);
  
  unpackrStream.on('data', (data) => {
    console.log('Received:', data);
    // Echo back with timestamp
    packrStream.write({
      echo: data,
      timestamp: new Date(),
      server: "echo-1"
    });
  });
});

server.listen(8080);

Stream Pipeline Patterns

Advanced patterns for stream processing and transformation.

Usage Examples:

import { PackrStream, UnpackrStream } from "msgpackr";
import { Transform, pipeline } from "stream";

// Data transformation pipeline
const transformStream = new Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    // Transform the data
    const transformed = {
      ...chunk,
      processed: true,
      timestamp: Date.now()
    };
    callback(null, transformed);
  }
});

// Complete processing pipeline
pipeline(
  inputStream,                    // Source data
  new UnpackrStream(),           // Unpack binary data
  transformStream,               // Transform objects
  new PackrStream(),             // Pack back to binary
  outputStream,                  // Destination
  (error) => {
    if (error) {
      console.error('Pipeline error:', error);
    } else {
      console.log('Pipeline completed successfully');
    }
  }
);

// Filtering stream example
const filterStream = new Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    // Only pass through objects matching criteria
    if (chunk.type === 'important') {
      callback(null, chunk);
    } else {
      callback(); // Skip this object
    }
  }
});

// Multi-stage processing
const processingSteam = new Transform({
  objectMode: true,
  transform(chunk, encoding, callback) {
    // Async processing
    processDataAsync(chunk)
      .then(result => callback(null, result))
      .catch(error => callback(error));
  }
});

Error Handling and Recovery

Streams provide robust error handling for incomplete or corrupted MessagePack data.

import { UnpackrStream } from "msgpackr";

const unpackrStream = new UnpackrStream();

unpackrStream.on('error', (error) => {
  if (error.incomplete) {
    // Incomplete MessagePack data - stream will handle automatically
    console.log('Incomplete data at position:', error.lastPosition);
    console.log('Successfully parsed values:', error.values);
    // Stream continues processing when more data arrives
  } else {
    // Other errors (malformed data, etc.)
    console.error('Stream error:', error.message);
    // May need to restart or reset stream
  }
});

// Monitor stream health
unpackrStream.on('pipe', (src) => {
  console.log('Stream connected to source');
});

unpackrStream.on('unpipe', (src) => {
  console.log('Stream disconnected from source');
});

Performance Optimization

Stream-specific performance considerations and optimizations.

import { PackrStream, UnpackrStream } from "msgpackr";

// High-throughput configuration
const highThroughputOptions = {
  useRecords: true,
  sequential: true,
  bundleStrings: true,
  highWaterMark: 65536  // Larger buffer for high-volume streams
};

const packrStream = new PackrStream(highThroughputOptions);
const unpackrStream = new UnpackrStream(highThroughputOptions);

// Monitor performance
let processedCount = 0;
const startTime = Date.now();

unpackrStream.on('data', (data) => {
  processedCount++;
  if (processedCount % 1000 === 0) {
    const elapsed = Date.now() - startTime;
    const rate = processedCount / (elapsed / 1000);
    console.log(`Processing rate: ${rate.toFixed(2)} objects/sec`);
  }
});

docs

advanced-classes.md

core-operations.md

extensions.md

index.md

iterators.md

streaming.md

tile.json