Python bindings for ZeroMQ (ØMQ), a lightweight and fast messaging library
—
Exception hierarchy for ZMQ-specific errors with errno mapping, context information, and specialized exceptions for different error conditions.
Foundation exception classes for ZMQ error handling.
class ZMQBaseError(Exception):
"""Base exception class for all ZMQ errors in Python."""
class ZMQError(ZMQBaseError):
"""
Standard ZMQ error wrapping errno-style errors.
Attributes:
- errno: ZMQ error code (int or None)
- strerror: Human-readable error description (str)
"""
def __init__(self, errno: int = None, msg: str = None) -> None:
"""
Create a ZMQ error.
Parameters:
- errno: ZMQ errno code (None to use zmq_errno())
- msg: Custom error message (None for default)
"""
@property
def errno(self) -> int:
"""ZMQ error code."""
@property
def strerror(self) -> str:
"""Human-readable error message."""Specialized exceptions for connection and binding issues.
class ZMQBindError(ZMQError):
"""
Error raised when bind() fails.
Includes additional context about the binding attempt.
"""
def __init__(self, errno: int, msg: str, address: str) -> None:
"""
Create a bind error.
Parameters:
- errno: ZMQ error code
- msg: Error message
- address: Address that failed to bind
"""
@property
def address(self) -> str:
"""Address that failed to bind."""Exceptions related to version compatibility issues.
class ZMQVersionError(ZMQError):
"""
Error raised when ZMQ version requirements are not met.
"""
def __init__(self, min_version: tuple, actual_version: tuple, msg: str = None) -> None:
"""
Create a version error.
Parameters:
- min_version: Minimum required version tuple
- actual_version: Actual ZMQ version tuple
- msg: Custom error message
"""
@property
def min_version(self) -> tuple:
"""Minimum required version."""
@property
def actual_version(self) -> tuple:
"""Actual ZMQ version."""Exceptions specific to asynchronous operations.
class NotDone(ZMQError):
"""
Error raised when async operation is not yet complete.
Used with MessageTracker and Future objects.
"""
class Again(ZMQError):
"""
Error raised for EAGAIN errno (would block).
Commonly raised with NOBLOCK flag operations.
"""Exceptions related to authentication and security.
class ZMQAuthError(ZMQError):
"""
Base class for authentication-related errors.
"""
class ZAPError(ZMQAuthError):
"""
Error in ZAP (ZMQ Authentication Protocol) processing.
"""
def __init__(self, status_code: str, status_text: str = None) -> None:
"""
Create a ZAP error.
Parameters:
- status_code: ZAP status code
- status_text: Human-readable status description
"""
@property
def status_code(self) -> str:
"""ZAP status code."""
@property
def status_text(self) -> str:
"""ZAP status description."""Exceptions related to context operations.
class ContextTerminated(ZMQError):
"""
Error raised when operations are attempted on terminated context.
"""
class InterruptedSystemCall(ZMQError):
"""
Error raised when ZMQ call is interrupted by system signal.
Equivalent to EINTR errno.
"""Exceptions related to message handling.
class MessageError(ZMQError):
"""
Base class for message-related errors.
"""
class MessageTooLong(MessageError):
"""
Error raised when message exceeds size limits.
Corresponds to EMSGSIZE errno.
"""
class InvalidMessage(MessageError):
"""
Error raised for malformed or invalid messages.
"""Warning categories for non-fatal issues.
class DraftFDWarning(RuntimeWarning):
"""
Warning for using experimental FD support on draft sockets.
Used when socket.FD is accessed on thread-safe sockets.
"""
def __init__(self, msg: str = "") -> None:
"""
Create a draft FD warning.
Parameters:
- msg: Custom warning message
"""PyZMQ maps libzmq error codes to Python exceptions:
# Common errno mappings
EAGAIN -> Again # Resource temporarily unavailable
EINTR -> InterruptedSystemCall # Interrupted system call
ETERM -> ContextTerminated # Context terminated
ENOTSUP -> ZMQError # Operation not supported
EINVAL -> ZMQError # Invalid argument
EMSGSIZE -> MessageTooLong # Message too long
EFSM -> ZMQError # Finite state machine errorimport zmq
context = zmq.Context()
socket = context.socket(zmq.REQ)
try:
# This will raise ZMQError (no connection)
socket.recv(zmq.NOBLOCK)
except zmq.Again:
print("No message available (would block)")
except zmq.ZMQError as e:
print(f"ZMQ error {e.errno}: {e.strerror}")
finally:
socket.close()
context.term()import zmq
context = zmq.Context()
socket = context.socket(zmq.REP)
try:
# This may fail if address is already in use
socket.bind("tcp://*:5555")
except zmq.ZMQBindError as e:
print(f"Failed to bind to {e.address}: {e.strerror}")
except zmq.ZMQError as e:
print(f"Other ZMQ error: {e}")
finally:
socket.close()
context.term()import zmq
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://localhost:5555")
while True:
try:
# Non-blocking receive
message = socket.recv(zmq.NOBLOCK)
print(f"Received: {message}")
except zmq.Again:
# No message available, do other work
print("No message, continuing...")
time.sleep(0.1)
except zmq.ZMQError as e:
print(f"ZMQ error: {e}")
break
socket.close()
context.term()import zmq
import threading
import time
def worker():
"""Worker thread that handles context termination"""
try:
socket = context.socket(zmq.PULL)
socket.connect("inproc://work")
while True:
message = socket.recv()
print(f"Processing: {message}")
except zmq.ContextTerminated:
print("Context terminated, worker exiting")
except zmq.ZMQError as e:
print(f"Worker error: {e}")
finally:
socket.close()
context = zmq.Context()
# Start worker thread
worker_thread = threading.Thread(target=worker)
worker_thread.start()
try:
time.sleep(2)
print("Terminating context...")
context.term() # This will cause ContextTerminated in worker
worker_thread.join()
print("Worker thread finished")
except KeyboardInterrupt:
print("Interrupted")
context.term()
worker_thread.join()import zmq
def require_zmq_version(min_version):
"""Ensure minimum ZMQ version or raise error"""
actual = zmq.zmq_version_info()
if actual < min_version:
raise zmq.ZMQVersionError(
min_version=min_version,
actual_version=actual,
msg=f"Requires ZMQ {min_version}, got {actual}"
)
try:
# Require ZMQ 4.2.0 or later
require_zmq_version((4, 2, 0))
print("ZMQ version OK")
except zmq.ZMQVersionError as e:
print(f"Version error: {e}")
print(f"Required: {e.min_version}")
print(f"Actual: {e.actual_version}")import zmq
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5555")
try:
# Send with tracking
tracker = socket.send(b"Large message", track=True)
# Wait for completion
if not tracker.wait(timeout=5000):
print("Send timeout")
else:
print("Message sent successfully")
except zmq.NotDone:
print("Message sending not complete")
except zmq.ZMQError as e:
print(f"Send error: {e}")
finally:
socket.close()
context.term()import zmq
from zmq.auth import ThreadAuthenticator
context = zmq.Context()
try:
# Server with authentication
auth = ThreadAuthenticator(context)
auth.start()
passwords = {'user': 'secret'}
auth.configure_plain(domain='*', passwords=passwords)
server = context.socket(zmq.REP)
server.set(zmq.PLAIN_SERVER, 1)
server.bind("tcp://*:5555")
# This may raise authentication errors
message = server.recv()
server.send(b"Authenticated response")
except zmq.ZMQAuthError as e:
print(f"Authentication error: {e}")
except zmq.ZMQError as e:
print(f"ZMQ error: {e}")
finally:
if 'server' in locals():
server.close()
if 'auth' in locals():
auth.stop()
context.term()import zmq
import logging
# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
context = zmq.Context()
socket = context.socket(zmq.REQ)
try:
socket.connect("tcp://localhost:5555")
socket.send_string("Hello")
# Set receive timeout
socket.setsockopt(zmq.RCVTIMEO, 5000)
response = socket.recv_string()
except zmq.Again:
logger.warning("Receive timeout - no response from server")
except zmq.ZMQError as e:
logger.error(f"ZMQ error {e.errno}: {e.strerror}")
logger.debug(f"Socket state: {socket.getsockopt(zmq.EVENTS)}")
except Exception as e:
logger.exception(f"Unexpected error: {e}")
finally:
socket.close()
context.term()import zmq
import time
import random
def send_with_retry(socket, message, max_retries=3):
"""Send message with exponential backoff retry"""
for attempt in range(max_retries + 1):
try:
socket.send_string(message, zmq.NOBLOCK)
return True
except zmq.Again:
if attempt == max_retries:
raise zmq.ZMQError(zmq.EAGAIN, "Max retries exceeded")
# Exponential backoff with jitter
delay = (2 ** attempt) + random.uniform(0, 1)
print(f"Send failed, retrying in {delay:.2f}s...")
time.sleep(delay)
except zmq.ZMQError as e:
print(f"Non-recoverable error: {e}")
raise
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.setsockopt(zmq.SNDHWM, 1) # Low HWM to trigger blocking
try:
socket.bind("tcp://*:5555")
# This may block/fail due to low HWM
send_with_retry(socket, "Test message")
print("Message sent successfully")
except zmq.ZMQError as e:
print(f"Failed to send: {e}")
finally:
socket.close()
context.term()import zmq
class ZMQOperationError(zmq.ZMQError):
"""Custom error with operation context"""
def __init__(self, operation, errno=None, msg=None):
super().__init__(errno, msg)
self.operation = operation
def __str__(self):
return f"{self.operation} failed: {self.strerror}"
def safe_socket_operation(socket, operation, *args, **kwargs):
"""Wrapper for socket operations with enhanced error context"""
try:
method = getattr(socket, operation)
return method(*args, **kwargs)
except zmq.ZMQError as e:
# Re-raise with operation context
raise ZMQOperationError(
operation=operation,
errno=e.errno,
msg=f"Operation '{operation}' failed: {e.strerror}"
) from e
context = zmq.Context()
socket = context.socket(zmq.REQ)
try:
safe_socket_operation(socket, 'connect', "tcp://localhost:5555")
safe_socket_operation(socket, 'send_string', "Hello")
response = safe_socket_operation(socket, 'recv_string')
print(f"Response: {response}")
except ZMQOperationError as e:
print(f"Operation error: {e}")
print(f"Failed operation: {e.operation}")
except zmq.ZMQError as e:
print(f"ZMQ error: {e}")
finally:
socket.close()
context.term()import zmq
import time
class RobustSocket:
"""Socket wrapper with automatic error recovery"""
def __init__(self, context, socket_type, address):
self.context = context
self.socket_type = socket_type
self.address = address
self.socket = None
self._connect()
def _connect(self):
"""Create and connect socket"""
if self.socket:
self.socket.close()
self.socket = self.context.socket(self.socket_type)
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.connect(self.address)
def send_reliable(self, message, timeout=5000):
"""Send with automatic recovery on failure"""
for attempt in range(3):
try:
self.socket.send_string(message, zmq.NOBLOCK)
return True
except zmq.Again:
# Socket may be blocked, try recreating
print(f"Send blocked, recreating socket (attempt {attempt + 1})")
self._connect()
continue
except zmq.ZMQError as e:
if e.errno == zmq.ETERM:
print("Context terminated")
return False
elif attempt == 2:
raise
else:
print(f"ZMQ error, retrying: {e}")
self._connect()
time.sleep(0.1)
return False
def close(self):
if self.socket:
self.socket.close()
# Usage
context = zmq.Context()
try:
robust_socket = RobustSocket(context, zmq.REQ, "tcp://localhost:5555")
success = robust_socket.send_reliable("Test message")
if success:
print("Message sent successfully")
else:
print("Failed to send message")
except zmq.ZMQError as e:
print(f"Unrecoverable error: {e}")
finally:
robust_socket.close()
context.term()Common ZMQ error codes and their meanings:
import zmq
# Network errors
zmq.EADDRINUSE # Address already in use
zmq.EADDRNOTAVAIL # Cannot assign requested address
zmq.ECONNREFUSED # Connection refused
zmq.ENETUNREACH # Network is unreachable
zmq.ETIMEDOUT # Connection timed out
# State errors
zmq.EFSM # Operation not valid in current state
zmq.ETERM # Context was terminated
zmq.EAGAIN # Resource temporarily unavailable
# Configuration errors
zmq.EINVAL # Invalid argument
zmq.ENOTSUP # Operation not supported
zmq.EMSGSIZE # Message too long
# System errors
zmq.EINTR # Interrupted system call
zmq.ENOMEM # Cannot allocate memory
zmq.EFAULT # Bad addressfrom typing import Optional, Union, Tuple
import errno
# Error types
ErrorCode = int
ErrorMessage = str
ZMQErrorNumber = Union[int, None]
# Version types
VersionTuple = Tuple[int, int, int]
# Address types
BindAddress = str
ConnectAddress = str
# Authentication types
ZAPStatusCode = str
ZAPStatusText = str
# Message types
MessageContent = Union[bytes, str]
MessageSize = int
# Context types
OperationName = str
OperationContext = dictInstall with Tessl CLI
npx tessl i tessl/pypi-pyzmq