Python bindings for ZeroMQ (ØMQ), a lightweight and fast messaging library
—
Zero-copy message objects (Frame/Message) for efficient data transfer, with support for metadata, copying control, and memory-mapped operations.
Frame objects provide zero-copy message handling with optional tracking and metadata support.
class Frame:
def __init__(self, data: Union[bytes, str, int] = b'', track: bool = True, copy: bool = None) -> None:
"""
Create a new Frame.
Parameters:
- data: Initial data (bytes, string, or buffer size)
- track: Whether to track the Frame lifecycle
- copy: Whether to copy the data (None for auto-detect)
"""
@property
def bytes(self) -> bytes:
"""Get frame data as bytes."""
@property
def buffer(self) -> memoryview:
"""Get frame data as memoryview buffer."""
def copy(self) -> Frame:
"""
Create a copy of the frame.
Returns:
- Frame: New Frame with copied data
"""
def __len__(self) -> int:
"""Get frame size in bytes."""
def __bytes__(self) -> bytes:
"""Convert frame to bytes."""
def __str__(self) -> str:
"""Convert frame to string using utf-8."""
@property
def more(self) -> bool:
"""True if this frame is part of a multipart message with more parts."""
@property
def tracker(self) -> Optional[MessageTracker]:
"""MessageTracker for this frame, if tracking is enabled."""Message objects extend Frame with additional ZMQ message properties and metadata.
class Message(Frame):
def __init__(self, data: Union[bytes, str, int] = b'', track: bool = True, copy: bool = None) -> None:
"""
Create a new Message.
Parameters:
- data: Initial data
- track: Whether to track message lifecycle
- copy: Whether to copy the data
"""
def get(self, property: int) -> int:
"""
Get a message property.
Parameters:
- property: Message property constant (MORE, SRCFD, SHARED, etc.)
Returns:
- int: Property value
"""
def set(self, property: int, value: int) -> None:
"""
Set a message property.
Parameters:
- property: Message property constant
- value: Property value
"""
def gets(self, property: str) -> Optional[str]:
"""
Get a message metadata property as string.
Parameters:
- property: Property name
Returns:
- str or None: Property value as string
"""
def routing_id(self) -> Optional[bytes]:
"""
Get the routing ID for this message.
Returns:
- bytes or None: Routing ID if available
"""
def group(self) -> Optional[str]:
"""
Get the group for this message.
Returns:
- str or None: Group name if set
"""MessageTracker objects allow monitoring the lifecycle of sent messages.
class MessageTracker:
@property
def done(self) -> bool:
"""True if all tracked messages have been sent/received."""
def wait(self, timeout: int = -1) -> bool:
"""
Wait for tracked messages to complete.
Parameters:
- timeout: Timeout in milliseconds (-1 for infinite)
Returns:
- bool: True if completed, False if timeout
"""import zmq
# Create frames from different data types
frame1 = zmq.Frame(b"Hello World")
frame2 = zmq.Frame("Hello World") # Auto-encoded as UTF-8
frame3 = zmq.Frame(1024) # Create frame with 1024 bytes capacity
# Access frame data
data = frame1.bytes
buffer = frame1.buffer
size = len(frame1)
print(f"Frame data: {frame1}")
print(f"Frame size: {size} bytes")
# Copy frames
frame_copy = frame1.copy()import zmq
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5555")
# Create large message
large_data = b"x" * 1000000 # 1MB of data
# Send with zero-copy (no data duplication)
frame = zmq.Frame(large_data, copy=False)
tracker = socket.send(frame, copy=False, track=True)
# Wait for message to be sent
if tracker.wait(timeout=5000):
print("Large message sent successfully")
else:
print("Send timeout")
socket.close()
context.term()import zmq
# Create message with tracking
msg = zmq.Message(b"Hello with metadata", track=True)
# Check message properties
has_more = msg.get(zmq.MORE)
is_shared = msg.get(zmq.SHARED)
print(f"Has more parts: {bool(has_more)}")
print(f"Is shared: {bool(is_shared)}")
# Access metadata (if available)
routing_id = msg.routing_id()
group = msg.group()
if routing_id:
print(f"Routing ID: {routing_id}")
if group:
print(f"Group: {group}")import zmq
context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.connect("tcp://localhost:5555")
# Create multipart message with frames
header = zmq.Frame(b"HEADER")
body = zmq.Frame(b"Message body content")
footer = zmq.Frame(b"FOOTER")
# Send as multipart message
parts = [header, body, footer]
tracker = socket.send_multipart(parts, copy=False, track=True)
# Wait for completion
if tracker.wait():
print("Multipart message sent")
socket.close()
context.term()import zmq
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://localhost:5555")
while True:
# Receive frame (zero-copy)
frame = socket.recv(copy=False)
# Check if it's part of multipart message
if frame.more:
print("This frame has more parts")
# Process frame data
data = frame.bytes
print(f"Received {len(data)} bytes")
# Access as buffer for efficient processing
buffer = frame.buffer
# Process buffer without copying...
if not frame.more:
print("Complete message received")
break
socket.close()
context.term()import zmq
import mmap
import os
# Create memory-mapped file
filename = "large_data.bin"
with open(filename, "wb") as f:
f.write(b"x" * 1000000) # 1MB file
# Memory-map the file
with open(filename, "rb") as f:
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
# Create frame from memory-mapped data (zero-copy)
frame = zmq.Frame(mm, copy=False)
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5555")
# Send memory-mapped data efficiently
tracker = socket.send(frame, copy=False, track=True)
if tracker.wait():
print("Memory-mapped data sent")
socket.close()
context.term()
# Clean up
os.unlink(filename)import zmq
from typing import Any
class TimestampedFrame(zmq.Frame):
"""Frame with timestamp metadata"""
def __init__(self, data: bytes = b'', timestamp: float = None):
super().__init__(data)
self._timestamp = timestamp or time.time()
@property
def timestamp(self) -> float:
return self._timestamp
def age(self) -> float:
"""Get age of frame in seconds"""
return time.time() - self._timestamp
# Usage
import time
frame = TimestampedFrame(b"Hello World")
time.sleep(1)
print(f"Frame age: {frame.age():.2f} seconds")import zmq
import numpy as np
# Create frame from numpy array
array = np.arange(1000, dtype=np.float64)
frame = zmq.Frame(array.tobytes(), copy=False)
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5555")
# Send numpy array efficiently
socket.send(frame, copy=False)
socket.close()
context.term()
# Receiving end
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://localhost:5555")
# Receive and reconstruct numpy array
frame = socket.recv(copy=False)
received_array = np.frombuffer(frame.buffer, dtype=np.float64)
print(f"Received array shape: {received_array.shape}")
print(f"Array data: {received_array[:10]}...") # First 10 elements
socket.close()
context.term()import zmq
# Efficient: Zero-copy sending
frame = zmq.Frame(large_data, copy=False)
socket.send(frame, copy=False)
# Less efficient: Data is copied twice
socket.send(large_data, copy=True) # Default behavior
# Efficient: Zero-copy receiving
frame = socket.recv(copy=False)
data = frame.buffer # Access as memoryview
# Less efficient: Data is copied
data = socket.recv(copy=True) # Returns bytes copyimport zmq
# Track message lifecycle for reliability
tracker = socket.send(frame, track=True)
# Non-blocking check
if tracker.done:
print("Message sent")
# Blocking wait with timeout
if tracker.wait(timeout=1000):
print("Message confirmed sent")
else:
print("Send timeout - message may be lost")from typing import Union, Optional, Any
import memoryview
# Frame data types
FrameData = Union[bytes, str, memoryview, int]
BufferLike = Union[bytes, memoryview, bytearray]
# Message property types
MessageProperty = int
PropertyValue = Union[int, str, None]
# Tracking types
TrackerResult = boolInstall with Tessl CLI
npx tessl i tessl/pypi-pyzmq