or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-classes.mdcore-operations.mdextensions.mdindex.mditerators.mdstreaming.md

streaming.mddocs/

0

# Streaming

1

2

Transform streams for processing continuous data flows with automatic MessagePack encoding and decoding, optimized for Node.js stream-based applications.

3

4

## Capabilities

5

6

### PackrStream Class

7

8

Transform stream that converts JavaScript objects to MessagePack binary data, suitable for network transmission or file storage.

9

10

```javascript { .api }

11

/**

12

* Transform stream for packing objects to MessagePack binary format

13

* Extends Node.js Transform stream with object mode input and binary output

14

*/

15

class PackrStream extends Transform {

16

constructor(options?: Options | StreamOptions);

17

}

18

19

interface StreamOptions {

20

highWaterMark?: number;

21

emitClose?: boolean;

22

allowHalfOpen?: boolean;

23

}

24

```

25

26

**Usage Examples:**

27

28

```javascript

29

import { PackrStream } from "msgpackr";

30

import { createWriteStream } from "fs";

31

32

// Basic streaming to file

33

const packrStream = new PackrStream();

34

const fileStream = createWriteStream("data.msgpack");

35

36

packrStream.pipe(fileStream);

37

38

// Write objects to stream

39

packrStream.write({ id: 1, name: "Alice" });

40

packrStream.write({ id: 2, name: "Bob" });

41

packrStream.end();

42

43

// With custom options

44

const optimizedStream = new PackrStream({

45

useRecords: true,

46

sequential: true,

47

highWaterMark: 16384

48

});

49

50

// Network streaming example

51

import { createServer } from "net";

52

53

const server = createServer((socket) => {

54

const packrStream = new PackrStream({ useRecords: true });

55

packrStream.pipe(socket);

56

57

// Stream data to client

58

const data = [

59

{ type: "user", id: 1, name: "Alice" },

60

{ type: "user", id: 2, name: "Bob" },

61

{ type: "message", text: "Hello World" }

62

];

63

64

data.forEach(item => packrStream.write(item));

65

packrStream.end();

66

});

67

```

68

69

### UnpackrStream Class

70

71

Transform stream that converts MessagePack binary data back to JavaScript objects, handling incomplete data and stream boundaries automatically.

72

73

```javascript { .api }

74

/**

75

* Transform stream for unpacking MessagePack binary data to objects

76

* Extends Node.js Transform stream with binary input and object mode output

77

*/

78

class UnpackrStream extends Transform {

79

constructor(options?: Options | StreamOptions);

80

}

81

```

82

83

**Usage Examples:**

84

85

```javascript

86

import { UnpackrStream } from "msgpackr";

87

import { createReadStream } from "fs";

88

89

// Basic streaming from file

90

const unpackrStream = new UnpackrStream();

91

const fileStream = createReadStream("data.msgpack");

92

93

fileStream.pipe(unpackrStream);

94

95

unpackrStream.on('data', (object) => {

96

console.log('Received object:', object);

97

});

98

99

// With structured cloning support

100

const cloningStream = new UnpackrStream({

101

structuredClone: true,

102

mapsAsObjects: true

103

});

104

105

// Network client example

106

import { connect } from "net";

107

108

const client = connect(8080, () => {

109

const unpackrStream = new UnpackrStream({ useRecords: true });

110

111

client.pipe(unpackrStream);

112

113

unpackrStream.on('data', (data) => {

114

console.log('Received from server:', data);

115

});

116

});

117

118

// Handle incomplete data gracefully

119

const robustStream = new UnpackrStream();

120

121

robustStream.on('error', (error) => {

122

if (error.incomplete) {

123

console.log('Incomplete data, will retry with more data');

124

} else {

125

console.error('Stream error:', error);

126

}

127

});

128

```

129

130

### Bidirectional Streaming

131

132

Combining PackrStream and UnpackrStream for full-duplex communication.

133

134

**Usage Examples:**

135

136

```javascript

137

import { PackrStream, UnpackrStream } from "msgpackr";

138

import { connect } from "net";

139

140

// Client-side bidirectional streaming

141

const client = connect(8080, () => {

142

const packrStream = new PackrStream({ useRecords: true });

143

const unpackrStream = new UnpackrStream({ useRecords: true });

144

145

// Setup bidirectional pipes

146

packrStream.pipe(client);

147

client.pipe(unpackrStream);

148

149

// Send data to server

150

packrStream.write({ command: "login", user: "alice" });

151

packrStream.write({ command: "getData", id: 123 });

152

153

// Receive responses

154

unpackrStream.on('data', (response) => {

155

console.log('Server response:', response);

156

});

157

});

158

159

// Server-side echo example

160

import { createServer } from "net";

161

162

const server = createServer((socket) => {

163

const packrStream = new PackrStream();

164

const unpackrStream = new UnpackrStream();

165

166

// Setup echo pipeline

167

socket.pipe(unpackrStream);

168

packrStream.pipe(socket);

169

170

unpackrStream.on('data', (data) => {

171

console.log('Received:', data);

172

// Echo back with timestamp

173

packrStream.write({

174

echo: data,

175

timestamp: new Date(),

176

server: "echo-1"

177

});

178

});

179

});

180

181

server.listen(8080);

182

```

183

184

### Stream Pipeline Patterns

185

186

Advanced patterns for stream processing and transformation.

187

188

**Usage Examples:**

189

190

```javascript

191

import { PackrStream, UnpackrStream } from "msgpackr";

192

import { Transform, pipeline } from "stream";

193

194

// Data transformation pipeline

195

const transformStream = new Transform({

196

objectMode: true,

197

transform(chunk, encoding, callback) {

198

// Transform the data

199

const transformed = {

200

...chunk,

201

processed: true,

202

timestamp: Date.now()

203

};

204

callback(null, transformed);

205

}

206

});

207

208

// Complete processing pipeline

209

pipeline(

210

inputStream, // Source data

211

new UnpackrStream(), // Unpack binary data

212

transformStream, // Transform objects

213

new PackrStream(), // Pack back to binary

214

outputStream, // Destination

215

(error) => {

216

if (error) {

217

console.error('Pipeline error:', error);

218

} else {

219

console.log('Pipeline completed successfully');

220

}

221

}

222

);

223

224

// Filtering stream example

225

const filterStream = new Transform({

226

objectMode: true,

227

transform(chunk, encoding, callback) {

228

// Only pass through objects matching criteria

229

if (chunk.type === 'important') {

230

callback(null, chunk);

231

} else {

232

callback(); // Skip this object

233

}

234

}

235

});

236

237

// Multi-stage processing

238

const processingSteam = new Transform({

239

objectMode: true,

240

transform(chunk, encoding, callback) {

241

// Async processing

242

processDataAsync(chunk)

243

.then(result => callback(null, result))

244

.catch(error => callback(error));

245

}

246

});

247

```

248

249

## Error Handling and Recovery

250

251

Streams provide robust error handling for incomplete or corrupted MessagePack data.

252

253

```javascript

254

import { UnpackrStream } from "msgpackr";

255

256

const unpackrStream = new UnpackrStream();

257

258

unpackrStream.on('error', (error) => {

259

if (error.incomplete) {

260

// Incomplete MessagePack data - stream will handle automatically

261

console.log('Incomplete data at position:', error.lastPosition);

262

console.log('Successfully parsed values:', error.values);

263

// Stream continues processing when more data arrives

264

} else {

265

// Other errors (malformed data, etc.)

266

console.error('Stream error:', error.message);

267

// May need to restart or reset stream

268

}

269

});

270

271

// Monitor stream health

272

unpackrStream.on('pipe', (src) => {

273

console.log('Stream connected to source');

274

});

275

276

unpackrStream.on('unpipe', (src) => {

277

console.log('Stream disconnected from source');

278

});

279

```

280

281

## Performance Optimization

282

283

Stream-specific performance considerations and optimizations.

284

285

```javascript

286

import { PackrStream, UnpackrStream } from "msgpackr";

287

288

// High-throughput configuration

289

const highThroughputOptions = {

290

useRecords: true,

291

sequential: true,

292

bundleStrings: true,

293

highWaterMark: 65536 // Larger buffer for high-volume streams

294

};

295

296

const packrStream = new PackrStream(highThroughputOptions);

297

const unpackrStream = new UnpackrStream(highThroughputOptions);

298

299

// Monitor performance

300

let processedCount = 0;

301

const startTime = Date.now();

302

303

unpackrStream.on('data', (data) => {

304

processedCount++;

305

if (processedCount % 1000 === 0) {

306

const elapsed = Date.now() - startTime;

307

const rate = processedCount / (elapsed / 1000);

308

console.log(`Processing rate: ${rate.toFixed(2)} objects/sec`);

309

}

310

});

311

```