0
# Streaming
1
2
Transform streams for processing continuous data flows with automatic MessagePack encoding and decoding, optimized for Node.js stream-based applications.
3
4
## Capabilities
5
6
### PackrStream Class
7
8
Transform stream that converts JavaScript objects to MessagePack binary data, suitable for network transmission or file storage.
9
10
```javascript { .api }
11
/**
12
* Transform stream for packing objects to MessagePack binary format
13
* Extends Node.js Transform stream with object mode input and binary output
14
*/
15
class PackrStream extends Transform {
16
constructor(options?: Options | StreamOptions);
17
}
18
19
interface StreamOptions {
20
highWaterMark?: number;
21
emitClose?: boolean;
22
allowHalfOpen?: boolean;
23
}
24
```
25
26
**Usage Examples:**
27
28
```javascript
29
import { PackrStream } from "msgpackr";
30
import { createWriteStream } from "fs";
31
32
// Basic streaming to file
33
const packrStream = new PackrStream();
34
const fileStream = createWriteStream("data.msgpack");
35
36
packrStream.pipe(fileStream);
37
38
// Write objects to stream
39
packrStream.write({ id: 1, name: "Alice" });
40
packrStream.write({ id: 2, name: "Bob" });
41
packrStream.end();
42
43
// With custom options
44
const optimizedStream = new PackrStream({
45
useRecords: true,
46
sequential: true,
47
highWaterMark: 16384
48
});
49
50
// Network streaming example
51
import { createServer } from "net";
52
53
const server = createServer((socket) => {
54
const packrStream = new PackrStream({ useRecords: true });
55
packrStream.pipe(socket);
56
57
// Stream data to client
58
const data = [
59
{ type: "user", id: 1, name: "Alice" },
60
{ type: "user", id: 2, name: "Bob" },
61
{ type: "message", text: "Hello World" }
62
];
63
64
data.forEach(item => packrStream.write(item));
65
packrStream.end();
66
});
67
```
68
69
### UnpackrStream Class
70
71
Transform stream that converts MessagePack binary data back to JavaScript objects, handling incomplete data and stream boundaries automatically.
72
73
```javascript { .api }
74
/**
75
* Transform stream for unpacking MessagePack binary data to objects
76
* Extends Node.js Transform stream with binary input and object mode output
77
*/
78
class UnpackrStream extends Transform {
79
constructor(options?: Options | StreamOptions);
80
}
81
```
82
83
**Usage Examples:**
84
85
```javascript
86
import { UnpackrStream } from "msgpackr";
87
import { createReadStream } from "fs";
88
89
// Basic streaming from file
90
const unpackrStream = new UnpackrStream();
91
const fileStream = createReadStream("data.msgpack");
92
93
fileStream.pipe(unpackrStream);
94
95
unpackrStream.on('data', (object) => {
96
console.log('Received object:', object);
97
});
98
99
// With structured cloning support
100
const cloningStream = new UnpackrStream({
101
structuredClone: true,
102
mapsAsObjects: true
103
});
104
105
// Network client example
106
import { connect } from "net";
107
108
const client = connect(8080, () => {
109
const unpackrStream = new UnpackrStream({ useRecords: true });
110
111
client.pipe(unpackrStream);
112
113
unpackrStream.on('data', (data) => {
114
console.log('Received from server:', data);
115
});
116
});
117
118
// Handle incomplete data gracefully
119
const robustStream = new UnpackrStream();
120
121
robustStream.on('error', (error) => {
122
if (error.incomplete) {
123
console.log('Incomplete data, will retry with more data');
124
} else {
125
console.error('Stream error:', error);
126
}
127
});
128
```
129
130
### Bidirectional Streaming
131
132
Combining PackrStream and UnpackrStream for full-duplex communication.
133
134
**Usage Examples:**
135
136
```javascript
137
import { PackrStream, UnpackrStream } from "msgpackr";
138
import { connect } from "net";
139
140
// Client-side bidirectional streaming
141
const client = connect(8080, () => {
142
const packrStream = new PackrStream({ useRecords: true });
143
const unpackrStream = new UnpackrStream({ useRecords: true });
144
145
// Setup bidirectional pipes
146
packrStream.pipe(client);
147
client.pipe(unpackrStream);
148
149
// Send data to server
150
packrStream.write({ command: "login", user: "alice" });
151
packrStream.write({ command: "getData", id: 123 });
152
153
// Receive responses
154
unpackrStream.on('data', (response) => {
155
console.log('Server response:', response);
156
});
157
});
158
159
// Server-side echo example
160
import { createServer } from "net";
161
162
const server = createServer((socket) => {
163
const packrStream = new PackrStream();
164
const unpackrStream = new UnpackrStream();
165
166
// Setup echo pipeline
167
socket.pipe(unpackrStream);
168
packrStream.pipe(socket);
169
170
unpackrStream.on('data', (data) => {
171
console.log('Received:', data);
172
// Echo back with timestamp
173
packrStream.write({
174
echo: data,
175
timestamp: new Date(),
176
server: "echo-1"
177
});
178
});
179
});
180
181
server.listen(8080);
182
```
183
184
### Stream Pipeline Patterns
185
186
Advanced patterns for stream processing and transformation.
187
188
**Usage Examples:**
189
190
```javascript
191
import { PackrStream, UnpackrStream } from "msgpackr";
192
import { Transform, pipeline } from "stream";
193
194
// Data transformation pipeline
195
const transformStream = new Transform({
196
objectMode: true,
197
transform(chunk, encoding, callback) {
198
// Transform the data
199
const transformed = {
200
...chunk,
201
processed: true,
202
timestamp: Date.now()
203
};
204
callback(null, transformed);
205
}
206
});
207
208
// Complete processing pipeline
209
pipeline(
210
inputStream, // Source data
211
new UnpackrStream(), // Unpack binary data
212
transformStream, // Transform objects
213
new PackrStream(), // Pack back to binary
214
outputStream, // Destination
215
(error) => {
216
if (error) {
217
console.error('Pipeline error:', error);
218
} else {
219
console.log('Pipeline completed successfully');
220
}
221
}
222
);
223
224
// Filtering stream example
225
const filterStream = new Transform({
226
objectMode: true,
227
transform(chunk, encoding, callback) {
228
// Only pass through objects matching criteria
229
if (chunk.type === 'important') {
230
callback(null, chunk);
231
} else {
232
callback(); // Skip this object
233
}
234
}
235
});
236
237
// Multi-stage processing
238
const processingSteam = new Transform({
239
objectMode: true,
240
transform(chunk, encoding, callback) {
241
// Async processing
242
processDataAsync(chunk)
243
.then(result => callback(null, result))
244
.catch(error => callback(error));
245
}
246
});
247
```
248
249
## Error Handling and Recovery
250
251
Streams provide robust error handling for incomplete or corrupted MessagePack data.
252
253
```javascript
254
import { UnpackrStream } from "msgpackr";
255
256
const unpackrStream = new UnpackrStream();
257
258
unpackrStream.on('error', (error) => {
259
if (error.incomplete) {
260
// Incomplete MessagePack data - stream will handle automatically
261
console.log('Incomplete data at position:', error.lastPosition);
262
console.log('Successfully parsed values:', error.values);
263
// Stream continues processing when more data arrives
264
} else {
265
// Other errors (malformed data, etc.)
266
console.error('Stream error:', error.message);
267
// May need to restart or reset stream
268
}
269
});
270
271
// Monitor stream health
272
unpackrStream.on('pipe', (src) => {
273
console.log('Stream connected to source');
274
});
275
276
unpackrStream.on('unpipe', (src) => {
277
console.log('Stream disconnected from source');
278
});
279
```
280
281
## Performance Optimization
282
283
Stream-specific performance considerations and optimizations.
284
285
```javascript
286
import { PackrStream, UnpackrStream } from "msgpackr";
287
288
// High-throughput configuration
289
const highThroughputOptions = {
290
useRecords: true,
291
sequential: true,
292
bundleStrings: true,
293
highWaterMark: 65536 // Larger buffer for high-volume streams
294
};
295
296
const packrStream = new PackrStream(highThroughputOptions);
297
const unpackrStream = new UnpackrStream(highThroughputOptions);
298
299
// Monitor performance
300
let processedCount = 0;
301
const startTime = Date.now();
302
303
unpackrStream.on('data', (data) => {
304
processedCount++;
305
if (processedCount % 1000 === 0) {
306
const elapsed = Date.now() - startTime;
307
const rate = processedCount / (elapsed / 1000);
308
console.log(`Processing rate: ${rate.toFixed(2)} objects/sec`);
309
}
310
});
311
```