CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyzmq

Python bindings for ZeroMQ (ØMQ), a lightweight and fast messaging library

Pending
Overview
Eval results
Files

message-handling.mddocs/

Message Handling

Zero-copy message objects (Frame/Message) for efficient data transfer, with support for metadata, copying control, and memory-mapped operations.

Capabilities

Frame Objects

Frame objects provide zero-copy message handling with optional tracking and metadata support.

class Frame:
    def __init__(self, data: Union[bytes, str, int] = b'', track: bool = True, copy: bool = None) -> None:
        """
        Create a new Frame.
        
        Parameters:
        - data: Initial data (bytes, string, or buffer size)
        - track: Whether to track the Frame lifecycle
        - copy: Whether to copy the data (None for auto-detect)
        """

    @property
    def bytes(self) -> bytes:
        """Get frame data as bytes."""

    @property  
    def buffer(self) -> memoryview:
        """Get frame data as memoryview buffer."""

    def copy(self) -> Frame:
        """
        Create a copy of the frame.
        
        Returns:
        - Frame: New Frame with copied data
        """

    def __len__(self) -> int:
        """Get frame size in bytes."""

    def __bytes__(self) -> bytes:
        """Convert frame to bytes."""

    def __str__(self) -> str:
        """Convert frame to string using utf-8."""

    @property
    def more(self) -> bool:
        """True if this frame is part of a multipart message with more parts."""

    @property
    def tracker(self) -> Optional[MessageTracker]:
        """MessageTracker for this frame, if tracking is enabled."""

Message Objects

Message objects extend Frame with additional ZMQ message properties and metadata.

class Message(Frame):
    def __init__(self, data: Union[bytes, str, int] = b'', track: bool = True, copy: bool = None) -> None:
        """
        Create a new Message.
        
        Parameters:
        - data: Initial data
        - track: Whether to track message lifecycle  
        - copy: Whether to copy the data
        """

    def get(self, property: int) -> int:
        """
        Get a message property.
        
        Parameters:
        - property: Message property constant (MORE, SRCFD, SHARED, etc.)
        
        Returns:
        - int: Property value
        """

    def set(self, property: int, value: int) -> None:
        """
        Set a message property.
        
        Parameters:
        - property: Message property constant
        - value: Property value
        """

    def gets(self, property: str) -> Optional[str]:
        """
        Get a message metadata property as string.
        
        Parameters:
        - property: Property name
        
        Returns:
        - str or None: Property value as string
        """

    def routing_id(self) -> Optional[bytes]:
        """
        Get the routing ID for this message.
        
        Returns:
        - bytes or None: Routing ID if available
        """

    def group(self) -> Optional[str]:
        """
        Get the group for this message.
        
        Returns:
        - str or None: Group name if set
        """

Message Tracking

MessageTracker objects allow monitoring the lifecycle of sent messages.

class MessageTracker:
    @property
    def done(self) -> bool:
        """True if all tracked messages have been sent/received."""

    def wait(self, timeout: int = -1) -> bool:
        """
        Wait for tracked messages to complete.
        
        Parameters:
        - timeout: Timeout in milliseconds (-1 for infinite)
        
        Returns:
        - bool: True if completed, False if timeout
        """

Usage Examples

Basic Frame Operations

import zmq

# Create frames from different data types
frame1 = zmq.Frame(b"Hello World")
frame2 = zmq.Frame("Hello World")  # Auto-encoded as UTF-8  
frame3 = zmq.Frame(1024)  # Create frame with 1024 bytes capacity

# Access frame data
data = frame1.bytes
buffer = frame1.buffer
size = len(frame1)

print(f"Frame data: {frame1}")
print(f"Frame size: {size} bytes")

# Copy frames
frame_copy = frame1.copy()

Zero-Copy Message Sending

import zmq

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5555")

# Create large message
large_data = b"x" * 1000000  # 1MB of data

# Send with zero-copy (no data duplication)
frame = zmq.Frame(large_data, copy=False)
tracker = socket.send(frame, copy=False, track=True)

# Wait for message to be sent
if tracker.wait(timeout=5000):
    print("Large message sent successfully")
else:
    print("Send timeout")

socket.close()
context.term()

Message Properties and Metadata

import zmq

# Create message with tracking
msg = zmq.Message(b"Hello with metadata", track=True)

# Check message properties  
has_more = msg.get(zmq.MORE)
is_shared = msg.get(zmq.SHARED)

print(f"Has more parts: {bool(has_more)}")
print(f"Is shared: {bool(is_shared)}")

# Access metadata (if available)
routing_id = msg.routing_id()
group = msg.group()

if routing_id:
    print(f"Routing ID: {routing_id}")
if group:
    print(f"Group: {group}")

Multipart Message Construction

import zmq

context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.connect("tcp://localhost:5555")

# Create multipart message with frames
header = zmq.Frame(b"HEADER")
body = zmq.Frame(b"Message body content")
footer = zmq.Frame(b"FOOTER")

# Send as multipart message
parts = [header, body, footer]
tracker = socket.send_multipart(parts, copy=False, track=True)

# Wait for completion
if tracker.wait():
    print("Multipart message sent")

socket.close()
context.term()

Receiving and Processing Frames

import zmq

context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://localhost:5555")

while True:
    # Receive frame (zero-copy)
    frame = socket.recv(copy=False)
    
    # Check if it's part of multipart message
    if frame.more:
        print("This frame has more parts")
    
    # Process frame data
    data = frame.bytes
    print(f"Received {len(data)} bytes")
    
    # Access as buffer for efficient processing
    buffer = frame.buffer
    # Process buffer without copying...
    
    if not frame.more:
        print("Complete message received")
        break

socket.close()
context.term()

Memory-Mapped Frame Creation

import zmq
import mmap
import os

# Create memory-mapped file
filename = "large_data.bin"
with open(filename, "wb") as f:
    f.write(b"x" * 1000000)  # 1MB file

# Memory-map the file
with open(filename, "rb") as f:
    with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
        # Create frame from memory-mapped data (zero-copy)
        frame = zmq.Frame(mm, copy=False)
        
        context = zmq.Context()
        socket = context.socket(zmq.PUSH)
        socket.bind("tcp://*:5555")
        
        # Send memory-mapped data efficiently
        tracker = socket.send(frame, copy=False, track=True)
        
        if tracker.wait():
            print("Memory-mapped data sent")
        
        socket.close()
        context.term()

# Clean up
os.unlink(filename)

Custom Frame Subclassing

import zmq
from typing import Any

class TimestampedFrame(zmq.Frame):
    """Frame with timestamp metadata"""
    
    def __init__(self, data: bytes = b'', timestamp: float = None):
        super().__init__(data)
        self._timestamp = timestamp or time.time()
    
    @property
    def timestamp(self) -> float:
        return self._timestamp
    
    def age(self) -> float:
        """Get age of frame in seconds"""
        return time.time() - self._timestamp

# Usage
import time

frame = TimestampedFrame(b"Hello World")
time.sleep(1)
print(f"Frame age: {frame.age():.2f} seconds")

Frame Buffer Operations

import zmq
import numpy as np

# Create frame from numpy array
array = np.arange(1000, dtype=np.float64)
frame = zmq.Frame(array.tobytes(), copy=False)

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5555")

# Send numpy array efficiently
socket.send(frame, copy=False)

socket.close()
context.term()

# Receiving end
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://localhost:5555")

# Receive and reconstruct numpy array
frame = socket.recv(copy=False)
received_array = np.frombuffer(frame.buffer, dtype=np.float64)

print(f"Received array shape: {received_array.shape}")
print(f"Array data: {received_array[:10]}...")  # First 10 elements

socket.close()
context.term()

Performance Considerations

Zero-Copy Operations

import zmq

# Efficient: Zero-copy sending
frame = zmq.Frame(large_data, copy=False)
socket.send(frame, copy=False)

# Less efficient: Data is copied twice
socket.send(large_data, copy=True)  # Default behavior

# Efficient: Zero-copy receiving  
frame = socket.recv(copy=False)
data = frame.buffer  # Access as memoryview

# Less efficient: Data is copied
data = socket.recv(copy=True)  # Returns bytes copy

Message Tracking

import zmq

# Track message lifecycle for reliability
tracker = socket.send(frame, track=True)

# Non-blocking check
if tracker.done:
    print("Message sent")

# Blocking wait with timeout
if tracker.wait(timeout=1000):
    print("Message confirmed sent")
else:
    print("Send timeout - message may be lost")

Types

from typing import Union, Optional, Any
import memoryview

# Frame data types
FrameData = Union[bytes, str, memoryview, int]
BufferLike = Union[bytes, memoryview, bytearray]

# Message property types
MessageProperty = int
PropertyValue = Union[int, str, None]

# Tracking types
TrackerResult = bool

Install with Tessl CLI

npx tessl i tessl/pypi-pyzmq

docs

async-support.md

authentication.md

constants.md

core-messaging.md

devices.md

error-handling.md

index.md

message-handling.md

polling.md

tile.json