0
# Streaming
1
2
Advanced classes for efficient streaming serialization and deserialization. These classes enable processing of large datasets, continuous data streams, and fine-grained control over the packing/unpacking process.
3
4
## Capabilities
5
6
### Packer Class
7
8
High-performance serializer for converting Python objects to MessagePack binary format with reusable buffer management and customizable encoding options.
9
10
```python { .api }
11
class Packer:
12
def __init__(
13
self,
14
*,
15
default=None,
16
use_single_float=False,
17
autoreset=True,
18
use_bin_type=True,
19
strict_types=False,
20
datetime=False,
21
unicode_errors=None,
22
buf_size=None
23
):
24
"""
25
MessagePack Packer for streaming serialization.
26
27
Parameters:
28
- default: callable, convert unsupported types
29
- use_single_float: bool, use 32-bit floats (default: False)
30
- autoreset: bool, reset buffer after each pack (default: True)
31
- use_bin_type: bool, use bin type for bytes (default: True)
32
- strict_types: bool, check exact types (default: False)
33
- datetime: bool, pack datetime as Timestamp (default: False)
34
- unicode_errors: str, Unicode error handling (default: None, uses 'strict')
35
- buf_size: int, internal buffer size (C extension only, default: None)
36
"""
37
38
def pack(self, obj):
39
"""
40
Pack object and return bytes (if autoreset=True) or None.
41
42
Parameters:
43
- obj: Python object to pack
44
45
Returns:
46
bytes or None: Packed data if autoreset=True, else None
47
48
Raises:
49
TypeError: When object cannot be serialized
50
OverflowError: When numbers are too large
51
"""
52
53
def pack_array_header(self, n):
54
"""
55
Pack array header for n elements.
56
57
Parameters:
58
- n: int, number of array elements
59
60
Returns:
61
bytes or None: Packed header
62
"""
63
64
def pack_map_header(self, n):
65
"""
66
Pack map header for n key-value pairs.
67
68
Parameters:
69
- n: int, number of map pairs
70
71
Returns:
72
bytes or None: Packed header
73
"""
74
75
def pack_map_pairs(self, pairs):
76
"""
77
Pack sequence of key-value pairs as map.
78
79
Parameters:
80
- pairs: iterable of (key, value) pairs
81
82
Returns:
83
bytes or None: Packed map data
84
"""
85
86
def pack_ext_type(self, typecode, data):
87
"""
88
Pack extension type with custom typecode.
89
90
Parameters:
91
- typecode: int, extension type code (0-127)
92
- data: bytes, extension data
93
94
Returns:
95
bytes or None: Packed extension type
96
"""
97
98
def bytes(self):
99
"""
100
Get packed bytes when autoreset=False.
101
102
Returns:
103
bytes: All packed data in buffer
104
"""
105
106
def reset(self):
107
"""
108
Clear internal buffer.
109
110
Returns:
111
None
112
"""
113
114
def getbuffer(self):
115
"""
116
Get internal buffer as bytes.
117
118
Returns:
119
bytes: Current buffer contents
120
"""
121
```
122
123
### Unpacker Class
124
125
Streaming deserializer for processing MessagePack data from files, sockets, or incremental data feeds with advanced control options.
126
127
```python { .api }
128
class Unpacker:
129
def __init__(
130
self,
131
file_like=None,
132
*,
133
read_size=0,
134
use_list=True,
135
raw=False,
136
timestamp=0,
137
strict_map_key=True,
138
object_hook=None,
139
object_pairs_hook=None,
140
list_hook=None,
141
unicode_errors=None,
142
max_buffer_size=100 * 1024 * 1024,
143
ext_hook=ExtType,
144
max_str_len=-1,
145
max_bin_len=-1,
146
max_array_len=-1,
147
max_map_len=-1,
148
max_ext_len=-1
149
):
150
"""
151
Streaming unpacker for MessagePack data.
152
153
Parameters:
154
- file_like: file-like object with read() method
155
- read_size: int, bytes to read per operation (default: 0, uses min(16KB, max_buffer_size))
156
- use_list: bool, use list instead of tuple (default: True)
157
- raw: bool, return bytes instead of str (default: False)
158
- timestamp: int, timestamp unpacking mode (0-3, default: 0)
159
- strict_map_key: bool, restrict map key types (default: True)
160
- object_hook: callable, hook for dict objects
161
- object_pairs_hook: callable, hook for key-value pairs
162
- list_hook: callable, hook for list objects (default: None)
163
- unicode_errors: str, Unicode error handling (default: None, uses 'strict')
164
- max_buffer_size: int, buffer size limit (default: 100MB)
165
- ext_hook: callable, extension type handler (default: ExtType)
166
- max_str_len: int, max string length (deprecated)
167
- max_bin_len: int, max binary length (deprecated)
168
- max_array_len: int, max array length
169
- max_map_len: int, max map length
170
- max_ext_len: int, max extension length (deprecated)
171
"""
172
173
def feed(self, next_bytes):
174
"""
175
Feed bytes to unpacker for processing.
176
177
Parameters:
178
- next_bytes: bytes, data to add to buffer
179
180
Returns:
181
None
182
183
Raises:
184
BufferFull: When buffer size limit exceeded
185
"""
186
187
def unpack(self):
188
"""
189
Unpack and return one object from buffer.
190
191
Returns:
192
object: Next unpacked object
193
194
Raises:
195
OutOfData: When buffer is incomplete
196
FormatError: When data format is invalid
197
StackError: When data is too nested
198
"""
199
200
def skip(self):
201
"""
202
Skip one object in buffer without unpacking.
203
204
Returns:
205
None
206
207
Raises:
208
OutOfData: When buffer is incomplete
209
FormatError: When data format is invalid
210
"""
211
212
def read_array_header(self):
213
"""
214
Read array header and return number of elements.
215
216
Returns:
217
int: Number of array elements
218
219
Raises:
220
OutOfData: When buffer is incomplete
221
FormatError: When not an array header
222
"""
223
224
def read_map_header(self):
225
"""
226
Read map header and return number of key-value pairs.
227
228
Returns:
229
int: Number of map pairs
230
231
Raises:
232
OutOfData: When buffer is incomplete
233
FormatError: When not a map header
234
"""
235
236
def read_bytes(self, n):
237
"""
238
Read n bytes from buffer.
239
240
Parameters:
241
- n: int, number of bytes to read
242
243
Returns:
244
bytes: Read data
245
246
Raises:
247
OutOfData: When insufficient data available
248
"""
249
250
def tell(self):
251
"""
252
Get current stream position.
253
254
Returns:
255
int: Stream offset in bytes
256
"""
257
258
def __iter__(self):
259
"""
260
Iterator protocol support.
261
262
Returns:
263
self: Iterator object
264
"""
265
266
def __next__(self):
267
"""
268
Get next unpacked object.
269
270
Returns:
271
object: Next unpacked object
272
273
Raises:
274
StopIteration: When no more objects available
275
OutOfData: When buffer is incomplete
276
"""
277
```
278
279
## Usage Examples
280
281
### Reusable Packer
282
283
```python
284
import msgpack
285
286
# Create packer with custom settings
287
packer = msgpack.Packer(
288
autoreset=False,
289
use_single_float=True,
290
strict_types=True
291
)
292
293
# Pack multiple objects
294
packer.pack({'type': 'user', 'id': 1})
295
packer.pack({'type': 'user', 'id': 2})
296
packer.pack({'type': 'user', 'id': 3})
297
298
# Get all packed data
299
all_data = packer.bytes()
300
301
# Save to file
302
with open('users.msgpack', 'wb') as f:
303
f.write(all_data)
304
305
packer.reset() # Clear buffer for reuse
306
```
307
308
### Streaming from File
309
310
```python
311
import msgpack
312
313
# Unpack multiple objects from file
314
with open('users.msgpack', 'rb') as f:
315
unpacker = msgpack.Unpacker(f, use_list=True)
316
for user in unpacker:
317
print(f"User {user['id']}: {user['type']}")
318
```
319
320
### Socket Streaming
321
322
```python
323
import socket
324
import msgpack
325
326
# Streaming from network socket
327
sock = socket.socket()
328
sock.connect(('localhost', 8080))
329
330
unpacker = msgpack.Unpacker(raw=False, max_buffer_size=1024*1024)
331
332
while True:
333
data = sock.recv(4096)
334
if not data:
335
break
336
337
unpacker.feed(data)
338
339
try:
340
while True:
341
obj = unpacker.unpack()
342
process_message(obj)
343
except msgpack.OutOfData:
344
# Need more data
345
continue
346
```
347
348
### Manual Array Processing
349
350
```python
351
import msgpack
352
353
# Process large arrays element by element
354
data = msgpack.packb([1, 2, 3, 4, 5] * 1000)
355
unpacker = msgpack.Unpacker()
356
unpacker.feed(data)
357
358
# Read array header
359
array_len = unpacker.read_array_header()
360
print(f"Array has {array_len} elements")
361
362
# Process elements individually
363
for i in range(array_len):
364
element = unpacker.unpack()
365
if i % 1000 == 0:
366
print(f"Processing element {i}: {element}")
367
```