CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-msgpack

MessagePack serializer for efficient binary data exchange among multiple languages

Pending
Overview
Eval results
Files

streaming.mddocs/

Streaming

Advanced classes for efficient streaming serialization and deserialization. These classes enable processing of large datasets, continuous data streams, and fine-grained control over the packing/unpacking process.

Capabilities

Packer Class

High-performance serializer for converting Python objects to MessagePack binary format with reusable buffer management and customizable encoding options.

class Packer:
    def __init__(
        self,
        *,
        default=None,
        use_single_float=False,
        autoreset=True,
        use_bin_type=True,
        strict_types=False,
        datetime=False,
        unicode_errors=None,
        buf_size=None
    ):
        """
        MessagePack Packer for streaming serialization.

        Parameters:
        - default: callable, convert unsupported types
        - use_single_float: bool, use 32-bit floats (default: False)
        - autoreset: bool, reset buffer after each pack (default: True)
        - use_bin_type: bool, use bin type for bytes (default: True)
        - strict_types: bool, check exact types (default: False)
        - datetime: bool, pack datetime as Timestamp (default: False)
        - unicode_errors: str, Unicode error handling (default: None, uses 'strict')
        - buf_size: int, internal buffer size (C extension only, default: None)
        """

    def pack(self, obj):
        """
        Pack object and return bytes (if autoreset=True) or None.

        Parameters:
        - obj: Python object to pack

        Returns:
        bytes or None: Packed data if autoreset=True, else None

        Raises:
        TypeError: When object cannot be serialized
        OverflowError: When numbers are too large
        """

    def pack_array_header(self, n):
        """
        Pack array header for n elements.

        Parameters:
        - n: int, number of array elements

        Returns:
        bytes or None: Packed header
        """

    def pack_map_header(self, n):
        """
        Pack map header for n key-value pairs.

        Parameters:
        - n: int, number of map pairs

        Returns:
        bytes or None: Packed header
        """

    def pack_map_pairs(self, pairs):
        """
        Pack sequence of key-value pairs as map.

        Parameters:
        - pairs: iterable of (key, value) pairs

        Returns:
        bytes or None: Packed map data
        """

    def pack_ext_type(self, typecode, data):
        """
        Pack extension type with custom typecode.

        Parameters:
        - typecode: int, extension type code (0-127)
        - data: bytes, extension data

        Returns:
        bytes or None: Packed extension type
        """

    def bytes(self):
        """
        Get packed bytes when autoreset=False.

        Returns:
        bytes: All packed data in buffer
        """

    def reset(self):
        """
        Clear internal buffer.

        Returns:
        None
        """

    def getbuffer(self):
        """
        Get internal buffer as bytes.

        Returns:
        bytes: Current buffer contents
        """

Unpacker Class

Streaming deserializer for processing MessagePack data from files, sockets, or incremental data feeds with advanced control options.

class Unpacker:
    def __init__(
        self,
        file_like=None,
        *,
        read_size=0,
        use_list=True,
        raw=False,
        timestamp=0,
        strict_map_key=True,
        object_hook=None,
        object_pairs_hook=None,
        list_hook=None,
        unicode_errors=None,
        max_buffer_size=100 * 1024 * 1024,
        ext_hook=ExtType,
        max_str_len=-1,
        max_bin_len=-1,
        max_array_len=-1,
        max_map_len=-1,
        max_ext_len=-1
    ):
        """
        Streaming unpacker for MessagePack data.

        Parameters:
        - file_like: file-like object with read() method
        - read_size: int, bytes to read per operation (default: 0, uses min(16KB, max_buffer_size))
        - use_list: bool, use list instead of tuple (default: True)
        - raw: bool, return bytes instead of str (default: False)
        - timestamp: int, timestamp unpacking mode (0-3, default: 0)
        - strict_map_key: bool, restrict map key types (default: True)
        - object_hook: callable, hook for dict objects
        - object_pairs_hook: callable, hook for key-value pairs  
        - list_hook: callable, hook for list objects (default: None)
        - unicode_errors: str, Unicode error handling (default: None, uses 'strict')
        - max_buffer_size: int, buffer size limit (default: 100MB)
        - ext_hook: callable, extension type handler (default: ExtType)
        - max_str_len: int, max string length (deprecated)
        - max_bin_len: int, max binary length (deprecated)
        - max_array_len: int, max array length
        - max_map_len: int, max map length
        - max_ext_len: int, max extension length (deprecated)
        """

    def feed(self, next_bytes):
        """
        Feed bytes to unpacker for processing.

        Parameters:
        - next_bytes: bytes, data to add to buffer

        Returns:
        None

        Raises:
        BufferFull: When buffer size limit exceeded
        """

    def unpack(self):
        """
        Unpack and return one object from buffer.

        Returns:
        object: Next unpacked object

        Raises:
        OutOfData: When buffer is incomplete
        FormatError: When data format is invalid
        StackError: When data is too nested
        """

    def skip(self):
        """
        Skip one object in buffer without unpacking.

        Returns:
        None

        Raises:
        OutOfData: When buffer is incomplete
        FormatError: When data format is invalid
        """

    def read_array_header(self):
        """
        Read array header and return number of elements.

        Returns:
        int: Number of array elements

        Raises:
        OutOfData: When buffer is incomplete
        FormatError: When not an array header
        """

    def read_map_header(self):
        """
        Read map header and return number of key-value pairs.

        Returns:
        int: Number of map pairs

        Raises:
        OutOfData: When buffer is incomplete
        FormatError: When not a map header
        """

    def read_bytes(self, n):
        """
        Read n bytes from buffer.

        Parameters:
        - n: int, number of bytes to read

        Returns:
        bytes: Read data

        Raises:
        OutOfData: When insufficient data available
        """

    def tell(self):
        """
        Get current stream position.

        Returns:
        int: Stream offset in bytes
        """

    def __iter__(self):
        """
        Iterator protocol support.

        Returns:
        self: Iterator object
        """

    def __next__(self):
        """
        Get next unpacked object.

        Returns:
        object: Next unpacked object

        Raises:
        StopIteration: When no more objects available
        OutOfData: When buffer is incomplete
        """

Usage Examples

Reusable Packer

import msgpack

# Create packer with custom settings
packer = msgpack.Packer(
    autoreset=False,
    use_single_float=True,
    strict_types=True
)

# Pack multiple objects
packer.pack({'type': 'user', 'id': 1})
packer.pack({'type': 'user', 'id': 2})
packer.pack({'type': 'user', 'id': 3})

# Get all packed data
all_data = packer.bytes()

# Save to file
with open('users.msgpack', 'wb') as f:
    f.write(all_data)

packer.reset()  # Clear buffer for reuse

Streaming from File

import msgpack

# Unpack multiple objects from file
with open('users.msgpack', 'rb') as f:
    unpacker = msgpack.Unpacker(f, use_list=True)
    for user in unpacker:
        print(f"User {user['id']}: {user['type']}")

Socket Streaming

import socket
import msgpack

# Streaming from network socket
sock = socket.socket()
sock.connect(('localhost', 8080))

unpacker = msgpack.Unpacker(raw=False, max_buffer_size=1024*1024)

while True:
    data = sock.recv(4096)
    if not data:
        break
    
    unpacker.feed(data)
    
    try:
        while True:
            obj = unpacker.unpack()
            process_message(obj)
    except msgpack.OutOfData:
        # Need more data
        continue

Manual Array Processing

import msgpack

# Process large arrays element by element
data = msgpack.packb([1, 2, 3, 4, 5] * 1000)
unpacker = msgpack.Unpacker()
unpacker.feed(data)

# Read array header
array_len = unpacker.read_array_header()
print(f"Array has {array_len} elements")

# Process elements individually
for i in range(array_len):
    element = unpacker.unpack()
    if i % 1000 == 0:
        print(f"Processing element {i}: {element}")

Install with Tessl CLI

npx tessl i tessl/pypi-msgpack

docs

core-functions.md

exceptions.md

index.md

streaming.md

types.md

tile.json