Python bindings for ZeroMQ (ØMQ), a lightweight and fast messaging library
npx @tessl/cli install tessl/pypi-pyzmq@27.0.0Python bindings for ZeroMQ (ØMQ), a lightweight and fast messaging library. PyZMQ enables Python applications to use ZeroMQ's high-performance asynchronous messaging patterns including request-reply, publish-subscribe, push-pull, and dealer-router patterns. It supports both CPython and PyPy implementations and provides comprehensive async/await support for modern Python applications.
pip install pyzmqimport zmqFor async support:
import zmq.asyncioimport zmq
# Create context and socket for publisher
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")
# Send messages
for i in range(10):
socket.send_string(f"Hello World {i}")
# Clean up
socket.close()
context.term()Context manager usage:
import zmq
# Using context manager for automatic cleanup
with zmq.Context() as context:
with context.socket(zmq.REQ) as socket:
socket.connect("tcp://localhost:5555")
socket.send_string("Hello")
message = socket.recv_string()
print(f"Received: {message}")PyZMQ provides multiple layers of abstraction:
The fundamental Context and Socket classes that form the foundation of all ZMQ communication patterns. Includes synchronous and asynchronous messaging with support for all ZMQ socket types.
class Context:
def socket(self, socket_type: int) -> Socket: ...
def term(self) -> None: ...
class Socket:
def bind(self, address: str) -> None: ...
def connect(self, address: str) -> None: ...
def send(self, data: bytes, flags: int = 0) -> None: ...
def recv(self, flags: int = 0) -> bytes: ...
def send_string(self, string: str, flags: int = 0, encoding: str = 'utf-8') -> None: ...
def recv_string(self, flags: int = 0, encoding: str = 'utf-8') -> str: ...Native async/await support with AsyncIO-compatible Context, Socket, and Poller classes that integrate seamlessly with Python's event loop for non-blocking messaging operations.
class Context:
def socket(self, socket_type: int) -> Socket: ...
class Socket:
async def send(self, data: bytes, flags: int = 0) -> None: ...
async def recv(self, flags: int = 0) -> bytes: ...
async def send_string(self, string: str, flags: int = 0, encoding: str = 'utf-8') -> None: ...
async def recv_string(self, flags: int = 0, encoding: str = 'utf-8') -> str: ...Zero-copy message objects (Frame/Message) for efficient data transfer, with support for metadata, copying control, and memory-mapped operations.
class Frame:
def __init__(self, data: bytes = b'', track: bool = True) -> None: ...
@property
def bytes(self) -> bytes: ...
def copy(self) -> Frame: ...
class Message(Frame):
def get(self, property: int) -> int: ...
def set(self, property: int, value: int) -> None: ...High-performance event polling for monitoring multiple sockets simultaneously, with support for timeouts and different polling backends.
class Poller:
def register(self, socket: Socket, flags: int = POLLIN | POLLOUT) -> None: ...
def unregister(self, socket: Socket) -> None: ...
def poll(self, timeout: int = -1) -> list[tuple[Socket, int]]: ...
def select(rlist: list, wlist: list, xlist: list, timeout: float = None) -> tuple: ...ZAP (ZMQ Authentication Protocol) implementation with support for NULL, PLAIN, and CURVE security mechanisms, including certificate generation and management.
class Authenticator:
def configure_plain(self, domain: str = '*', passwords: dict = None) -> None: ...
def configure_curve(self, domain: str = '*', location: str = '') -> None: ...
def curve_keypair() -> tuple[bytes, bytes]: ...
def curve_public(secret_key: bytes) -> bytes: ...Background devices for message routing including proxy, queue, forwarder, and steerable proxy implementations with monitoring capabilities.
def proxy(frontend: Socket, backend: Socket, capture: Socket = None) -> None: ...
def proxy_steerable(frontend: Socket, backend: Socket, capture: Socket = None, control: Socket = None) -> None: ...
class ThreadDevice:
def start(self) -> None: ...
def join(self, timeout: float = None) -> None: ...Comprehensive constants for socket types, options, events, and error codes, organized as Python enums for type safety and IDE support.
class SocketType(IntEnum):
PAIR: int
PUB: int
SUB: int
REQ: int
REP: int
DEALER: int
ROUTER: int
PULL: int
PUSH: int
class SocketOption(IntEnum):
AFFINITY: int
ROUTING_ID: int
SUBSCRIBE: int
UNSUBSCRIBE: int
RATE: int
RECOVERY_IVL: intException hierarchy for ZMQ-specific errors with errno mapping, context information, and specialized exceptions for different error conditions.
class ZMQError(Exception):
errno: int
strerror: str
class ZMQBindError(ZMQError): ...
class ZMQVersionError(ZMQError): ...
class NotDone(ZMQError): ...Functions for retrieving PyZMQ and libzmq version information.
def pyzmq_version() -> str:
"""Return the version of PyZMQ as a string."""
def pyzmq_version_info() -> tuple[int, int, int] | tuple[int, int, int, float]:
"""Return the PyZMQ version as a tuple of at least three numbers."""
def zmq_version() -> str:
"""Return the version of libzmq as a string."""
def zmq_version_info() -> tuple[int, int, int]:
"""Return the libzmq version as a tuple of three integers."""
__version__: str # PyZMQ version string
__revision__: str # Git revision stringHelper functions for building and linking against PyZMQ.
def get_includes() -> list[str]:
"""Return directories to include for linking against PyZMQ with Cython."""
def get_library_dirs() -> list[str]:
"""Return directories used to link against PyZMQ's bundled libzmq."""
def has(capability: str) -> bool:
"""Check if libzmq has a specific capability (curve, ipc, pgm, etc.)."""
def strerror(errno: int) -> str:
"""Return error string for given ZMQ errno."""
def zmq_errno() -> int:
"""Return the current ZMQ errno."""
# Important constants
COPY_THRESHOLD: int # Copy threshold for message handling (65536)
DRAFT_API: bool # True if draft API features are available
IPC_PATH_MAX_LEN: int # Maximum length for IPC socket pathsfrom typing import Union, Optional, Any, Callable
# Common type aliases
SocketType = int
Address = str
Data = Union[bytes, str, memoryview, Frame]
Flags = int
Timeout = int
MessageTracker = Any # Message tracking object
Frame = Any # ZMQ frame object for zero-copy operations