MessagePack serializer for efficient binary data exchange among multiple languages
—
Advanced classes for efficient streaming serialization and deserialization. These classes enable processing of large datasets, continuous data streams, and fine-grained control over the packing/unpacking process.
High-performance serializer for converting Python objects to MessagePack binary format with reusable buffer management and customizable encoding options.
class Packer:
def __init__(
self,
*,
default=None,
use_single_float=False,
autoreset=True,
use_bin_type=True,
strict_types=False,
datetime=False,
unicode_errors=None,
buf_size=None
):
"""
MessagePack Packer for streaming serialization.
Parameters:
- default: callable, convert unsupported types
- use_single_float: bool, use 32-bit floats (default: False)
- autoreset: bool, reset buffer after each pack (default: True)
- use_bin_type: bool, use bin type for bytes (default: True)
- strict_types: bool, check exact types (default: False)
- datetime: bool, pack datetime as Timestamp (default: False)
- unicode_errors: str, Unicode error handling (default: None, uses 'strict')
- buf_size: int, internal buffer size (C extension only, default: None)
"""
def pack(self, obj):
"""
Pack object and return bytes (if autoreset=True) or None.
Parameters:
- obj: Python object to pack
Returns:
bytes or None: Packed data if autoreset=True, else None
Raises:
TypeError: When object cannot be serialized
OverflowError: When numbers are too large
"""
def pack_array_header(self, n):
"""
Pack array header for n elements.
Parameters:
- n: int, number of array elements
Returns:
bytes or None: Packed header
"""
def pack_map_header(self, n):
"""
Pack map header for n key-value pairs.
Parameters:
- n: int, number of map pairs
Returns:
bytes or None: Packed header
"""
def pack_map_pairs(self, pairs):
"""
Pack sequence of key-value pairs as map.
Parameters:
- pairs: iterable of (key, value) pairs
Returns:
bytes or None: Packed map data
"""
def pack_ext_type(self, typecode, data):
"""
Pack extension type with custom typecode.
Parameters:
- typecode: int, extension type code (0-127)
- data: bytes, extension data
Returns:
bytes or None: Packed extension type
"""
def bytes(self):
"""
Get packed bytes when autoreset=False.
Returns:
bytes: All packed data in buffer
"""
def reset(self):
"""
Clear internal buffer.
Returns:
None
"""
def getbuffer(self):
"""
Get internal buffer as bytes.
Returns:
bytes: Current buffer contents
"""Streaming deserializer for processing MessagePack data from files, sockets, or incremental data feeds with advanced control options.
class Unpacker:
def __init__(
self,
file_like=None,
*,
read_size=0,
use_list=True,
raw=False,
timestamp=0,
strict_map_key=True,
object_hook=None,
object_pairs_hook=None,
list_hook=None,
unicode_errors=None,
max_buffer_size=100 * 1024 * 1024,
ext_hook=ExtType,
max_str_len=-1,
max_bin_len=-1,
max_array_len=-1,
max_map_len=-1,
max_ext_len=-1
):
"""
Streaming unpacker for MessagePack data.
Parameters:
- file_like: file-like object with read() method
- read_size: int, bytes to read per operation (default: 0, uses min(16KB, max_buffer_size))
- use_list: bool, use list instead of tuple (default: True)
- raw: bool, return bytes instead of str (default: False)
- timestamp: int, timestamp unpacking mode (0-3, default: 0)
- strict_map_key: bool, restrict map key types (default: True)
- object_hook: callable, hook for dict objects
- object_pairs_hook: callable, hook for key-value pairs
- list_hook: callable, hook for list objects (default: None)
- unicode_errors: str, Unicode error handling (default: None, uses 'strict')
- max_buffer_size: int, buffer size limit (default: 100MB)
- ext_hook: callable, extension type handler (default: ExtType)
- max_str_len: int, max string length (deprecated)
- max_bin_len: int, max binary length (deprecated)
- max_array_len: int, max array length
- max_map_len: int, max map length
- max_ext_len: int, max extension length (deprecated)
"""
def feed(self, next_bytes):
"""
Feed bytes to unpacker for processing.
Parameters:
- next_bytes: bytes, data to add to buffer
Returns:
None
Raises:
BufferFull: When buffer size limit exceeded
"""
def unpack(self):
"""
Unpack and return one object from buffer.
Returns:
object: Next unpacked object
Raises:
OutOfData: When buffer is incomplete
FormatError: When data format is invalid
StackError: When data is too nested
"""
def skip(self):
"""
Skip one object in buffer without unpacking.
Returns:
None
Raises:
OutOfData: When buffer is incomplete
FormatError: When data format is invalid
"""
def read_array_header(self):
"""
Read array header and return number of elements.
Returns:
int: Number of array elements
Raises:
OutOfData: When buffer is incomplete
FormatError: When not an array header
"""
def read_map_header(self):
"""
Read map header and return number of key-value pairs.
Returns:
int: Number of map pairs
Raises:
OutOfData: When buffer is incomplete
FormatError: When not a map header
"""
def read_bytes(self, n):
"""
Read n bytes from buffer.
Parameters:
- n: int, number of bytes to read
Returns:
bytes: Read data
Raises:
OutOfData: When insufficient data available
"""
def tell(self):
"""
Get current stream position.
Returns:
int: Stream offset in bytes
"""
def __iter__(self):
"""
Iterator protocol support.
Returns:
self: Iterator object
"""
def __next__(self):
"""
Get next unpacked object.
Returns:
object: Next unpacked object
Raises:
StopIteration: When no more objects available
OutOfData: When buffer is incomplete
"""import msgpack
# Create packer with custom settings
packer = msgpack.Packer(
autoreset=False,
use_single_float=True,
strict_types=True
)
# Pack multiple objects
packer.pack({'type': 'user', 'id': 1})
packer.pack({'type': 'user', 'id': 2})
packer.pack({'type': 'user', 'id': 3})
# Get all packed data
all_data = packer.bytes()
# Save to file
with open('users.msgpack', 'wb') as f:
f.write(all_data)
packer.reset() # Clear buffer for reuseimport msgpack
# Unpack multiple objects from file
with open('users.msgpack', 'rb') as f:
unpacker = msgpack.Unpacker(f, use_list=True)
for user in unpacker:
print(f"User {user['id']}: {user['type']}")import socket
import msgpack
# Streaming from network socket
sock = socket.socket()
sock.connect(('localhost', 8080))
unpacker = msgpack.Unpacker(raw=False, max_buffer_size=1024*1024)
while True:
data = sock.recv(4096)
if not data:
break
unpacker.feed(data)
try:
while True:
obj = unpacker.unpack()
process_message(obj)
except msgpack.OutOfData:
# Need more data
continueimport msgpack
# Process large arrays element by element
data = msgpack.packb([1, 2, 3, 4, 5] * 1000)
unpacker = msgpack.Unpacker()
unpacker.feed(data)
# Read array header
array_len = unpacker.read_array_header()
print(f"Array has {array_len} elements")
# Process elements individually
for i in range(array_len):
element = unpacker.unpack()
if i % 1000 == 0:
print(f"Processing element {i}: {element}")Install with Tessl CLI
npx tessl i tessl/pypi-msgpack