Ultra-fast MessagePack implementation with extensions for records and structured cloning
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Transform streams for processing continuous data flows with automatic MessagePack encoding and decoding, optimized for Node.js stream-based applications.
Transform stream that converts JavaScript objects to MessagePack binary data, suitable for network transmission or file storage.
/**
* Transform stream for packing objects to MessagePack binary format
* Extends Node.js Transform stream with object mode input and binary output
*/
class PackrStream extends Transform {
constructor(options?: Options | StreamOptions);
}
interface StreamOptions {
highWaterMark?: number;
emitClose?: boolean;
allowHalfOpen?: boolean;
}Usage Examples:
import { PackrStream } from "msgpackr";
import { createWriteStream } from "fs";
// Basic streaming to file
const packrStream = new PackrStream();
const fileStream = createWriteStream("data.msgpack");
packrStream.pipe(fileStream);
// Write objects to stream
packrStream.write({ id: 1, name: "Alice" });
packrStream.write({ id: 2, name: "Bob" });
packrStream.end();
// With custom options
const optimizedStream = new PackrStream({
useRecords: true,
sequential: true,
highWaterMark: 16384
});
// Network streaming example
import { createServer } from "net";
const server = createServer((socket) => {
const packrStream = new PackrStream({ useRecords: true });
packrStream.pipe(socket);
// Stream data to client
const data = [
{ type: "user", id: 1, name: "Alice" },
{ type: "user", id: 2, name: "Bob" },
{ type: "message", text: "Hello World" }
];
data.forEach(item => packrStream.write(item));
packrStream.end();
});Transform stream that converts MessagePack binary data back to JavaScript objects, handling incomplete data and stream boundaries automatically.
/**
* Transform stream for unpacking MessagePack binary data to objects
* Extends Node.js Transform stream with binary input and object mode output
*/
class UnpackrStream extends Transform {
constructor(options?: Options | StreamOptions);
}Usage Examples:
import { UnpackrStream } from "msgpackr";
import { createReadStream } from "fs";
// Basic streaming from file
const unpackrStream = new UnpackrStream();
const fileStream = createReadStream("data.msgpack");
fileStream.pipe(unpackrStream);
unpackrStream.on('data', (object) => {
console.log('Received object:', object);
});
// With structured cloning support
const cloningStream = new UnpackrStream({
structuredClone: true,
mapsAsObjects: true
});
// Network client example
import { connect } from "net";
const client = connect(8080, () => {
const unpackrStream = new UnpackrStream({ useRecords: true });
client.pipe(unpackrStream);
unpackrStream.on('data', (data) => {
console.log('Received from server:', data);
});
});
// Handle incomplete data gracefully
const robustStream = new UnpackrStream();
robustStream.on('error', (error) => {
if (error.incomplete) {
console.log('Incomplete data, will retry with more data');
} else {
console.error('Stream error:', error);
}
});Combining PackrStream and UnpackrStream for full-duplex communication.
Usage Examples:
import { PackrStream, UnpackrStream } from "msgpackr";
import { connect } from "net";
// Client-side bidirectional streaming
const client = connect(8080, () => {
const packrStream = new PackrStream({ useRecords: true });
const unpackrStream = new UnpackrStream({ useRecords: true });
// Setup bidirectional pipes
packrStream.pipe(client);
client.pipe(unpackrStream);
// Send data to server
packrStream.write({ command: "login", user: "alice" });
packrStream.write({ command: "getData", id: 123 });
// Receive responses
unpackrStream.on('data', (response) => {
console.log('Server response:', response);
});
});
// Server-side echo example
import { createServer } from "net";
const server = createServer((socket) => {
const packrStream = new PackrStream();
const unpackrStream = new UnpackrStream();
// Setup echo pipeline
socket.pipe(unpackrStream);
packrStream.pipe(socket);
unpackrStream.on('data', (data) => {
console.log('Received:', data);
// Echo back with timestamp
packrStream.write({
echo: data,
timestamp: new Date(),
server: "echo-1"
});
});
});
server.listen(8080);Advanced patterns for stream processing and transformation.
Usage Examples:
import { PackrStream, UnpackrStream } from "msgpackr";
import { Transform, pipeline } from "stream";
// Data transformation pipeline
const transformStream = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
// Transform the data
const transformed = {
...chunk,
processed: true,
timestamp: Date.now()
};
callback(null, transformed);
}
});
// Complete processing pipeline
pipeline(
inputStream, // Source data
new UnpackrStream(), // Unpack binary data
transformStream, // Transform objects
new PackrStream(), // Pack back to binary
outputStream, // Destination
(error) => {
if (error) {
console.error('Pipeline error:', error);
} else {
console.log('Pipeline completed successfully');
}
}
);
// Filtering stream example
const filterStream = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
// Only pass through objects matching criteria
if (chunk.type === 'important') {
callback(null, chunk);
} else {
callback(); // Skip this object
}
}
});
// Multi-stage processing
const processingSteam = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
// Async processing
processDataAsync(chunk)
.then(result => callback(null, result))
.catch(error => callback(error));
}
});Streams provide robust error handling for incomplete or corrupted MessagePack data.
import { UnpackrStream } from "msgpackr";
const unpackrStream = new UnpackrStream();
unpackrStream.on('error', (error) => {
if (error.incomplete) {
// Incomplete MessagePack data - stream will handle automatically
console.log('Incomplete data at position:', error.lastPosition);
console.log('Successfully parsed values:', error.values);
// Stream continues processing when more data arrives
} else {
// Other errors (malformed data, etc.)
console.error('Stream error:', error.message);
// May need to restart or reset stream
}
});
// Monitor stream health
unpackrStream.on('pipe', (src) => {
console.log('Stream connected to source');
});
unpackrStream.on('unpipe', (src) => {
console.log('Stream disconnected from source');
});Stream-specific performance considerations and optimizations.
import { PackrStream, UnpackrStream } from "msgpackr";
// High-throughput configuration
const highThroughputOptions = {
useRecords: true,
sequential: true,
bundleStrings: true,
highWaterMark: 65536 // Larger buffer for high-volume streams
};
const packrStream = new PackrStream(highThroughputOptions);
const unpackrStream = new UnpackrStream(highThroughputOptions);
// Monitor performance
let processedCount = 0;
const startTime = Date.now();
unpackrStream.on('data', (data) => {
processedCount++;
if (processedCount % 1000 === 0) {
const elapsed = Date.now() - startTime;
const rate = processedCount / (elapsed / 1000);
console.log(`Processing rate: ${rate.toFixed(2)} objects/sec`);
}
});