or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-functions.mdexceptions.mdindex.mdstreaming.mdtypes.md

streaming.mddocs/

0

# Streaming

1

2

Advanced classes for efficient streaming serialization and deserialization. These classes enable processing of large datasets, continuous data streams, and fine-grained control over the packing/unpacking process.

3

4

## Capabilities

5

6

### Packer Class

7

8

High-performance serializer for converting Python objects to MessagePack binary format with reusable buffer management and customizable encoding options.

9

10

```python { .api }

11

class Packer:

12

def __init__(

13

self,

14

*,

15

default=None,

16

use_single_float=False,

17

autoreset=True,

18

use_bin_type=True,

19

strict_types=False,

20

datetime=False,

21

unicode_errors=None,

22

buf_size=None

23

):

24

"""

25

MessagePack Packer for streaming serialization.

26

27

Parameters:

28

- default: callable, convert unsupported types

29

- use_single_float: bool, use 32-bit floats (default: False)

30

- autoreset: bool, reset buffer after each pack (default: True)

31

- use_bin_type: bool, use bin type for bytes (default: True)

32

- strict_types: bool, check exact types (default: False)

33

- datetime: bool, pack datetime as Timestamp (default: False)

34

- unicode_errors: str, Unicode error handling (default: None, uses 'strict')

35

- buf_size: int, internal buffer size (C extension only, default: None)

36

"""

37

38

def pack(self, obj):

39

"""

40

Pack object and return bytes (if autoreset=True) or None.

41

42

Parameters:

43

- obj: Python object to pack

44

45

Returns:

46

bytes or None: Packed data if autoreset=True, else None

47

48

Raises:

49

TypeError: When object cannot be serialized

50

OverflowError: When numbers are too large

51

"""

52

53

def pack_array_header(self, n):

54

"""

55

Pack array header for n elements.

56

57

Parameters:

58

- n: int, number of array elements

59

60

Returns:

61

bytes or None: Packed header

62

"""

63

64

def pack_map_header(self, n):

65

"""

66

Pack map header for n key-value pairs.

67

68

Parameters:

69

- n: int, number of map pairs

70

71

Returns:

72

bytes or None: Packed header

73

"""

74

75

def pack_map_pairs(self, pairs):

76

"""

77

Pack sequence of key-value pairs as map.

78

79

Parameters:

80

- pairs: iterable of (key, value) pairs

81

82

Returns:

83

bytes or None: Packed map data

84

"""

85

86

def pack_ext_type(self, typecode, data):

87

"""

88

Pack extension type with custom typecode.

89

90

Parameters:

91

- typecode: int, extension type code (0-127)

92

- data: bytes, extension data

93

94

Returns:

95

bytes or None: Packed extension type

96

"""

97

98

def bytes(self):

99

"""

100

Get packed bytes when autoreset=False.

101

102

Returns:

103

bytes: All packed data in buffer

104

"""

105

106

def reset(self):

107

"""

108

Clear internal buffer.

109

110

Returns:

111

None

112

"""

113

114

def getbuffer(self):

115

"""

116

Get internal buffer as bytes.

117

118

Returns:

119

bytes: Current buffer contents

120

"""

121

```

122

123

### Unpacker Class

124

125

Streaming deserializer for processing MessagePack data from files, sockets, or incremental data feeds with advanced control options.

126

127

```python { .api }

128

class Unpacker:

129

def __init__(

130

self,

131

file_like=None,

132

*,

133

read_size=0,

134

use_list=True,

135

raw=False,

136

timestamp=0,

137

strict_map_key=True,

138

object_hook=None,

139

object_pairs_hook=None,

140

list_hook=None,

141

unicode_errors=None,

142

max_buffer_size=100 * 1024 * 1024,

143

ext_hook=ExtType,

144

max_str_len=-1,

145

max_bin_len=-1,

146

max_array_len=-1,

147

max_map_len=-1,

148

max_ext_len=-1

149

):

150

"""

151

Streaming unpacker for MessagePack data.

152

153

Parameters:

154

- file_like: file-like object with read() method

155

- read_size: int, bytes to read per operation (default: 0, uses min(16KB, max_buffer_size))

156

- use_list: bool, use list instead of tuple (default: True)

157

- raw: bool, return bytes instead of str (default: False)

158

- timestamp: int, timestamp unpacking mode (0-3, default: 0)

159

- strict_map_key: bool, restrict map key types (default: True)

160

- object_hook: callable, hook for dict objects

161

- object_pairs_hook: callable, hook for key-value pairs

162

- list_hook: callable, hook for list objects (default: None)

163

- unicode_errors: str, Unicode error handling (default: None, uses 'strict')

164

- max_buffer_size: int, buffer size limit (default: 100MB)

165

- ext_hook: callable, extension type handler (default: ExtType)

166

- max_str_len: int, max string length (deprecated)

167

- max_bin_len: int, max binary length (deprecated)

168

- max_array_len: int, max array length

169

- max_map_len: int, max map length

170

- max_ext_len: int, max extension length (deprecated)

171

"""

172

173

def feed(self, next_bytes):

174

"""

175

Feed bytes to unpacker for processing.

176

177

Parameters:

178

- next_bytes: bytes, data to add to buffer

179

180

Returns:

181

None

182

183

Raises:

184

BufferFull: When buffer size limit exceeded

185

"""

186

187

def unpack(self):

188

"""

189

Unpack and return one object from buffer.

190

191

Returns:

192

object: Next unpacked object

193

194

Raises:

195

OutOfData: When buffer is incomplete

196

FormatError: When data format is invalid

197

StackError: When data is too nested

198

"""

199

200

def skip(self):

201

"""

202

Skip one object in buffer without unpacking.

203

204

Returns:

205

None

206

207

Raises:

208

OutOfData: When buffer is incomplete

209

FormatError: When data format is invalid

210

"""

211

212

def read_array_header(self):

213

"""

214

Read array header and return number of elements.

215

216

Returns:

217

int: Number of array elements

218

219

Raises:

220

OutOfData: When buffer is incomplete

221

FormatError: When not an array header

222

"""

223

224

def read_map_header(self):

225

"""

226

Read map header and return number of key-value pairs.

227

228

Returns:

229

int: Number of map pairs

230

231

Raises:

232

OutOfData: When buffer is incomplete

233

FormatError: When not a map header

234

"""

235

236

def read_bytes(self, n):

237

"""

238

Read n bytes from buffer.

239

240

Parameters:

241

- n: int, number of bytes to read

242

243

Returns:

244

bytes: Read data

245

246

Raises:

247

OutOfData: When insufficient data available

248

"""

249

250

def tell(self):

251

"""

252

Get current stream position.

253

254

Returns:

255

int: Stream offset in bytes

256

"""

257

258

def __iter__(self):

259

"""

260

Iterator protocol support.

261

262

Returns:

263

self: Iterator object

264

"""

265

266

def __next__(self):

267

"""

268

Get next unpacked object.

269

270

Returns:

271

object: Next unpacked object

272

273

Raises:

274

StopIteration: When no more objects available

275

OutOfData: When buffer is incomplete

276

"""

277

```

278

279

## Usage Examples

280

281

### Reusable Packer

282

283

```python

284

import msgpack

285

286

# Create packer with custom settings

287

packer = msgpack.Packer(

288

autoreset=False,

289

use_single_float=True,

290

strict_types=True

291

)

292

293

# Pack multiple objects

294

packer.pack({'type': 'user', 'id': 1})

295

packer.pack({'type': 'user', 'id': 2})

296

packer.pack({'type': 'user', 'id': 3})

297

298

# Get all packed data

299

all_data = packer.bytes()

300

301

# Save to file

302

with open('users.msgpack', 'wb') as f:

303

f.write(all_data)

304

305

packer.reset() # Clear buffer for reuse

306

```

307

308

### Streaming from File

309

310

```python

311

import msgpack

312

313

# Unpack multiple objects from file

314

with open('users.msgpack', 'rb') as f:

315

unpacker = msgpack.Unpacker(f, use_list=True)

316

for user in unpacker:

317

print(f"User {user['id']}: {user['type']}")

318

```

319

320

### Socket Streaming

321

322

```python

323

import socket

324

import msgpack

325

326

# Streaming from network socket

327

sock = socket.socket()

328

sock.connect(('localhost', 8080))

329

330

unpacker = msgpack.Unpacker(raw=False, max_buffer_size=1024*1024)

331

332

while True:

333

data = sock.recv(4096)

334

if not data:

335

break

336

337

unpacker.feed(data)

338

339

try:

340

while True:

341

obj = unpacker.unpack()

342

process_message(obj)

343

except msgpack.OutOfData:

344

# Need more data

345

continue

346

```

347

348

### Manual Array Processing

349

350

```python

351

import msgpack

352

353

# Process large arrays element by element

354

data = msgpack.packb([1, 2, 3, 4, 5] * 1000)

355

unpacker = msgpack.Unpacker()

356

unpacker.feed(data)

357

358

# Read array header

359

array_len = unpacker.read_array_header()

360

print(f"Array has {array_len} elements")

361

362

# Process elements individually

363

for i in range(array_len):

364

element = unpacker.unpack()

365

if i % 1000 == 0:

366

print(f"Processing element {i}: {element}")

367

```