Python bindings for ZeroMQ (ØMQ), a lightweight and fast messaging library
—
Comprehensive constants for socket types, options, events, and error codes, organized as Python enums for type safety and IDE support.
ZMQ socket type constants defining communication patterns.
class SocketType(IntEnum):
"""ZMQ socket types for different messaging patterns"""
# Basic socket types
PAIR: int # Exclusive pair pattern
PUB: int # Publisher socket
SUB: int # Subscriber socket
REQ: int # Request socket (client)
REP: int # Reply socket (server)
DEALER: int # Dealer socket (async REQ)
ROUTER: int # Router socket (async REP)
PULL: int # Pull socket (worker)
PUSH: int # Push socket (ventilator)
XPUB: int # Extended publisher
XSUB: int # Extended subscriber
STREAM: int # Stream socket for TCP
# Deprecated aliases
XREQ: int # Deprecated alias for DEALER
XREP: int # Deprecated alias for ROUTER
# Draft socket types (experimental)
SERVER: int # Server socket
CLIENT: int # Client socket
RADIO: int # Radio socket
DISH: int # Dish socket
GATHER: int # Gather socket
SCATTER: int # Scatter socket
DGRAM: int # Datagram socket
PEER: int # Peer socket
CHANNEL: int # Channel socketExtensive socket configuration options for controlling behavior and performance.
class SocketOption(IntEnum):
"""Socket configuration options"""
# Connection and identity
AFFINITY: int # I/O thread affinity
ROUTING_ID: int # Socket routing ID
SUBSCRIBE: int # Subscription filter (SUB)
UNSUBSCRIBE: int # Remove subscription (SUB)
RATE: int # Multicast data rate
RECOVERY_IVL: int # Multicast recovery interval
SNDBUF: int # Send buffer size
RCVBUF: int # Receive buffer size
RCVMORE: int # More message parts available
# Flow control
RCVHWM: int # Receive high water mark
SNDHWM: int # Send high water mark
HWM: int # High water mark (both)
# Timeouts and intervals
LINGER: int # Socket linger period
RECONNECT_IVL: int # Reconnection interval
RECONNECT_IVL_MAX: int # Maximum reconnection interval
BACKLOG: int # Listen backlog
MAXMSGSIZE: int # Maximum message size
# Socket state
FD: int # Socket file descriptor
EVENTS: int # Socket event state
TYPE: int # Socket type
RCVTIMEO: int # Receive timeout
SNDTIMEO: int # Send timeout
# Protocol options
IPV6: int # IPv6 socket option
IPV4ONLY: int # IPv4-only socket
IMMEDIATE: int # Queue messages only for completed connections
DELAY_ATTACH_ON_CONNECT: int # Delay connection completion
# Security and authentication
PLAIN_SERVER: int # PLAIN security server
PLAIN_USERNAME: int # PLAIN username
PLAIN_PASSWORD: int # PLAIN password
CURVE_SERVER: int # CURVE security server
CURVE_PUBLICKEY: int # CURVE public key
CURVE_SECRETKEY: int # CURVE secret key
CURVE_SERVERKEY: int # CURVE server key
ZAP_DOMAIN: int # ZAP authentication domain
# Advanced options
TCP_KEEPALIVE: int # TCP keepalive
TCP_KEEPALIVE_CNT: int # TCP keepalive count
TCP_KEEPALIVE_IDLE: int # TCP keepalive idle
TCP_KEEPALIVE_INTVL: int # TCP keepalive interval
TCP_ACCEPT_FILTER: int # TCP accept filter
# Monitoring and debugging
MONITOR: int # Socket monitoring
LAST_ENDPOINT: int # Last bound/connected endpoint
ROUTER_MANDATORY: int # Router mandatory routing
ROUTER_RAW: int # Router raw mode
PROBE_ROUTER: int # Probe router connections
REQ_CORRELATE: int # REQ correlation
REQ_RELAXED: int # REQ relaxed mode
# Multicast options
MULTICAST_HOPS: int # Multicast hop limit
MULTICAST_MAXTPDU: int # Maximum transport data unit
# Performance options
USE_FD: int # Use file descriptor
ROUTER_HANDOVER: int # Router handover
TOS: int # Type of service
ROUTER_NOTIFY: int # Router notifications
BINDTODEVICE: int # Bind to device
GSSAPI_SERVER: int # GSSAPI server mode
GSSAPI_PLAINTEXT: int # GSSAPI plaintext modeOptions for configuring ZMQ contexts.
class ContextOption(IntEnum):
"""Context configuration options"""
IO_THREADS: int # Number of I/O threads
MAX_SOCKETS: int # Maximum sockets per context
SOCKET_LIMIT: int # Socket limit
THREAD_PRIORITY: int # Thread priority
THREAD_SCHED_POLICY: int # Thread scheduling policy
MAX_MSGSZ: int # Maximum message size
MSG_T_SIZE: int # Message structure size
THREAD_AFFINITY_CPU_ADD: int # Add CPU affinity
THREAD_AFFINITY_CPU_REMOVE: int # Remove CPU affinity
THREAD_NAME_PREFIX: int # Thread name prefixConstants for socket monitoring and event handling.
class Event(IntEnum):
"""Socket event types for monitoring"""
CONNECTED: int # Socket connected
CONNECT_DELAYED: int # Connection delayed
CONNECT_RETRIED: int # Connection retried
LISTENING: int # Socket listening
BIND_FAILED: int # Bind failed
ACCEPTED: int # Connection accepted
ACCEPT_FAILED: int # Accept failed
CLOSED: int # Socket closed
CLOSE_FAILED: int # Close failed
DISCONNECTED: int # Socket disconnected
MONITOR_STOPPED: int # Monitoring stopped
HANDSHAKE_FAILED_NO_DETAIL: int # Handshake failed
HANDSHAKE_SUCCEEDED: int # Handshake succeeded
HANDSHAKE_FAILED_PROTOCOL: int # Protocol handshake failed
HANDSHAKE_FAILED_AUTH: int # Auth handshake failed
# Event aggregates
ALL: int # All eventsConstants for polling socket events.
class PollEvent(IntEnum):
"""Socket polling event flags"""
POLLIN: int # Socket readable
POLLOUT: int # Socket writable
POLLERR: int # Socket errorOptions for message handling and properties.
class MessageOption(IntEnum):
"""Message property constants"""
MORE: int # More message parts follow
SRCFD: int # Source file descriptor
SHARED: int # Message is shared
PROBE_ROUTER: int # Probe router message
ROUTING_ID: int # Message routing ID
GROUP: int # Message groupZMQ-specific error codes extending standard errno values.
class Errno(IntEnum):
"""ZMQ error codes"""
# Standard errno values
EAGAIN: int # Resource temporarily unavailable
EFAULT: int # Bad address
EINVAL: int # Invalid argument
EMSGSIZE: int # Message too long
ENOTSUP: int # Operation not supported
EPROTONOSUPPORT: int # Protocol not supported
ENOBUFS: int # No buffer space available
ENETDOWN: int # Network is down
EADDRINUSE: int # Address already in use
EADDRNOTAVAIL: int # Cannot assign requested address
ECONNREFUSED: int # Connection refused
EINPROGRESS: int # Operation now in progress
ENOTSOCK: int # Socket operation on non-socket
EAFNOSUPPORT: int # Address family not supported
ENETUNREACH: int # Network is unreachable
ECONNABORTED: int # Software caused connection abort
ECONNRESET: int # Connection reset by peer
ENOTCONN: int # Transport endpoint is not connected
ETIMEDOUT: int # Connection timed out
EHOSTUNREACH: int # No route to host
ENETRESET: int # Network dropped connection because of reset
# ZMQ-specific errors
EFSM: int # Operation cannot be accomplished in current state
ENOCOMPATPROTO: int # The protocol is not compatible with the socket type
ETERM: int # Context was terminated
EMTHREAD: int # No I/O thread availableConstants for ZMQ devices.
class DeviceType(IntEnum):
"""Device type constants"""
QUEUE: int # Load-balancing queue device
FORWARDER: int # Message forwarder device
STREAMER: int # Message streamer deviceConstants for security and authentication mechanisms.
class SecurityMechanism(IntEnum):
"""Security mechanism constants"""
NULL: int # No security
PLAIN: int # Plain text authentication
CURVE: int # CURVE encryption
GSSAPI: int # GSSAPI authenticationVarious other constants used throughout PyZMQ.
class Flag(IntEnum):
"""Message and socket flags"""
NOBLOCK: int # Non-blocking operation
DONTWAIT: int # Alias for NOBLOCK
SNDMORE: int # Send more partsConstants related to ZMQ version handling.
# Version constants
VERSION_MAJOR: int # Major version number
VERSION_MINOR: int # Minor version number
VERSION_PATCH: int # Patch version number
VERSION: int # Combined version number
# Protocol versions
PROTOCOL_ERROR_ZMTP_V1: int # ZMTP 1.0 protocol error
PROTOCOL_ERROR_ZMTP_V3: int # ZMTP 3.0 protocol error
PROTOCOL_ERROR_ZAP_V1: int # ZAP 1.0 protocol errorimport zmq
context = zmq.Context()
# Use socket type constants
publisher = context.socket(zmq.PUB)
subscriber = context.socket(zmq.SUB)
requester = context.socket(zmq.REQ)
replier = context.socket(zmq.REP)
dealer = context.socket(zmq.DEALER)
router = context.socket(zmq.ROUTER)
# Clean up
for socket in [publisher, subscriber, requester, replier, dealer, router]:
socket.close()
context.term()import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
# Configure socket options
socket.setsockopt(zmq.SNDHWM, 1000) # Send high water mark
socket.setsockopt(zmq.RCVHWM, 1000) # Receive high water mark
socket.setsockopt(zmq.LINGER, 1000) # Linger period
socket.setsockopt(zmq.RCVTIMEO, 5000) # Receive timeout
socket.setsockopt(zmq.SNDTIMEO, 5000) # Send timeout
socket.setsockopt(zmq.SNDBUF, 65536) # Send buffer size
socket.setsockopt(zmq.RCVBUF, 65536) # Receive buffer size
# Read socket options
hwm = socket.getsockopt(zmq.SNDHWM)
socket_type = socket.getsockopt(zmq.TYPE)
events = socket.getsockopt(zmq.EVENTS)
print(f"Send HWM: {hwm}")
print(f"Socket type: {socket_type}")
print(f"Events: {events}")
socket.close()
context.term()import zmq
# Configure context options
context = zmq.Context()
context.set(zmq.IO_THREADS, 4) # 4 I/O threads
context.set(zmq.MAX_SOCKETS, 1024) # Max 1024 sockets
context.set(zmq.THREAD_PRIORITY, 1) # Thread priority
# Read context options
io_threads = context.get(zmq.IO_THREADS)
max_sockets = context.get(zmq.MAX_SOCKETS)
print(f"I/O threads: {io_threads}")
print(f"Max sockets: {max_sockets}")
context.term()import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
# Start monitoring all events
socket.monitor("inproc://monitor", zmq.EVENT_ALL)
# Or monitor specific events
socket.monitor("inproc://monitor",
zmq.EVENT_CONNECTED | zmq.EVENT_DISCONNECTED)
# Create monitor socket to receive events
monitor = context.socket(zmq.PAIR)
monitor.connect("inproc://monitor")
socket.connect("tcp://localhost:5555")
# Receive monitoring events
try:
event = monitor.recv_pyobj(zmq.NOBLOCK)
print(f"Event: {event}")
except zmq.Again:
print("No events")
socket.close()
monitor.close()
context.term()import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN | zmq.POLLOUT)
# Poll for events
events = poller.poll(1000) # 1 second timeout
for sock, event in events:
if event & zmq.POLLIN:
print("Socket is readable")
if event & zmq.POLLOUT:
print("Socket is writable")
if event & zmq.POLLERR:
print("Socket has error")
socket.close()
context.term()import zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
try:
# This will fail - no connection
socket.recv(zmq.NOBLOCK)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
print("No message available (non-blocking)")
elif e.errno == zmq.EFSM:
print("Finite state machine error")
elif e.errno == zmq.ETERM:
print("Context was terminated")
else:
print(f"Other ZMQ error: {e}")
socket.close()
context.term()import zmq
context = zmq.Context()
server = context.socket(zmq.REP)
# Configure CURVE security
server.setsockopt(zmq.CURVE_SERVER, 1)
server.setsockopt(zmq.CURVE_SECRETKEY, server_secret_key)
# Configure PLAIN security
# server.setsockopt(zmq.PLAIN_SERVER, 1)
# Configure authentication domain
server.setsockopt_string(zmq.ZAP_DOMAIN, "global")
server.bind("tcp://*:5555")
# Client configuration
client = context.socket(zmq.REQ)
client.setsockopt(zmq.CURVE_PUBLICKEY, client_public_key)
client.setsockopt(zmq.CURVE_SECRETKEY, client_secret_key)
client.setsockopt(zmq.CURVE_SERVERKEY, server_public_key)
client.connect("tcp://localhost:5555")
server.close()
client.close()
context.term()import zmq
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5555")
# Send multipart message
socket.send(b"Part 1", zmq.SNDMORE) # More parts follow
socket.send(b"Part 2", zmq.SNDMORE) # More parts follow
socket.send(b"Part 3") # Last part (no SNDMORE)
# Non-blocking send
try:
socket.send(b"Message", zmq.NOBLOCK)
except zmq.Again:
print("Send would block")
socket.close()
context.term()import zmq
# Check PyZMQ version
print(f"PyZMQ version: {zmq.pyzmq_version()}")
# Check libzmq version
print(f"libzmq version: {zmq.zmq_version()}")
# Version info as tuple
zmq_version_info = zmq.zmq_version_info()
print(f"libzmq version info: {zmq_version_info}")
# Check for specific features
if zmq.has("curve"):
print("CURVE security available")
if zmq.has("ipc"):
print("IPC transport available")
if zmq.has("pgm"):
print("PGM multicast available")import zmq
from zmq.devices import Device
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:5560")
# Use device type constants
queue_device = Device(zmq.QUEUE, frontend, backend)
# forwarder_device = Device(zmq.FORWARDER, frontend, backend)
# streamer_device = Device(zmq.STREAMER, frontend, backend)
try:
queue_device.run()
except KeyboardInterrupt:
pass
finally:
frontend.close()
backend.close()
context.term()import zmq
# Access constants directly
socket = context.socket(zmq.REQ)
socket.setsockopt(zmq.LINGER, 1000)
# Access through enum classes (type-safe)
from zmq.constants import SocketType, SocketOption
socket = context.socket(SocketType.REQ)
socket.setsockopt(SocketOption.LINGER, 1000)
# Check if constant exists
if hasattr(zmq, 'CURVE_SERVER'):
print("CURVE security available")
# Iterate over enum values
for socket_type in SocketType:
print(f"Socket type: {socket_type.name} = {socket_type.value}")from enum import IntEnum
from typing import Union
# Enum base types
ZMQConstant = int
ConstantValue = Union[int, str, bytes]
# Socket configuration types
SocketTypeValue = int # Socket type constant value
SocketOptionValue = int # Socket option constant value
ContextOptionValue = int # Context option constant value
EventValue = int # Event constant value
ErrorValue = int # Error code constant value
# Flag combinations
MessageFlags = int # Combination of message flags
PollFlags = int # Combination of poll flags
EventFlags = int # Combination of event flags
# Version types
VersionTuple = tuple[int, int, int] # (major, minor, patch)
VersionNumber = int # Combined version numberInstall with Tessl CLI
npx tessl i tessl/pypi-pyzmq