CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyzmq

Python bindings for ZeroMQ (ØMQ), a lightweight and fast messaging library

Pending
Overview
Eval results
Files

error-handling.mddocs/

Error Handling

Exception hierarchy for ZMQ-specific errors with errno mapping, context information, and specialized exceptions for different error conditions.

Capabilities

Base Exception Classes

Foundation exception classes for ZMQ error handling.

class ZMQBaseError(Exception):
    """Base exception class for all ZMQ errors in Python."""

class ZMQError(ZMQBaseError):
    """
    Standard ZMQ error wrapping errno-style errors.
    
    Attributes:
    - errno: ZMQ error code (int or None)
    - strerror: Human-readable error description (str)
    """
    
    def __init__(self, errno: int = None, msg: str = None) -> None:
        """
        Create a ZMQ error.
        
        Parameters:
        - errno: ZMQ errno code (None to use zmq_errno())
        - msg: Custom error message (None for default)
        """
    
    @property
    def errno(self) -> int:
        """ZMQ error code."""
    
    @property 
    def strerror(self) -> str:
        """Human-readable error message."""

Connection Errors

Specialized exceptions for connection and binding issues.

class ZMQBindError(ZMQError):
    """
    Error raised when bind() fails.
    
    Includes additional context about the binding attempt.
    """
    
    def __init__(self, errno: int, msg: str, address: str) -> None:
        """
        Create a bind error.
        
        Parameters:
        - errno: ZMQ error code
        - msg: Error message
        - address: Address that failed to bind
        """
    
    @property
    def address(self) -> str:
        """Address that failed to bind."""

Version Errors

Exceptions related to version compatibility issues.

class ZMQVersionError(ZMQError):
    """
    Error raised when ZMQ version requirements are not met.
    """
    
    def __init__(self, min_version: tuple, actual_version: tuple, msg: str = None) -> None:
        """
        Create a version error.
        
        Parameters:
        - min_version: Minimum required version tuple
        - actual_version: Actual ZMQ version tuple  
        - msg: Custom error message
        """
    
    @property
    def min_version(self) -> tuple:
        """Minimum required version."""
    
    @property
    def actual_version(self) -> tuple:
        """Actual ZMQ version."""

Async Errors

Exceptions specific to asynchronous operations.

class NotDone(ZMQError):
    """
    Error raised when async operation is not yet complete.
    
    Used with MessageTracker and Future objects.
    """

class Again(ZMQError):
    """
    Error raised for EAGAIN errno (would block).
    
    Commonly raised with NOBLOCK flag operations.
    """

Security Errors

Exceptions related to authentication and security.

class ZMQAuthError(ZMQError):
    """
    Base class for authentication-related errors.
    """

class ZAPError(ZMQAuthError):
    """
    Error in ZAP (ZMQ Authentication Protocol) processing.
    """
    
    def __init__(self, status_code: str, status_text: str = None) -> None:
        """
        Create a ZAP error.
        
        Parameters:
        - status_code: ZAP status code
        - status_text: Human-readable status description
        """
    
    @property
    def status_code(self) -> str:
        """ZAP status code."""
    
    @property
    def status_text(self) -> str:
        """ZAP status description."""

Context Errors

Exceptions related to context operations.

class ContextTerminated(ZMQError):
    """
    Error raised when operations are attempted on terminated context.
    """

class InterruptedSystemCall(ZMQError):
    """
    Error raised when ZMQ call is interrupted by system signal.
    
    Equivalent to EINTR errno.
    """

Message Errors

Exceptions related to message handling.

class MessageError(ZMQError):
    """
    Base class for message-related errors.
    """

class MessageTooLong(MessageError):
    """
    Error raised when message exceeds size limits.
    
    Corresponds to EMSGSIZE errno.
    """

class InvalidMessage(MessageError):
    """
    Error raised for malformed or invalid messages.
    """

Warning Classes

Warning categories for non-fatal issues.

class DraftFDWarning(RuntimeWarning):
    """
    Warning for using experimental FD support on draft sockets.
    
    Used when socket.FD is accessed on thread-safe sockets.
    """
    
    def __init__(self, msg: str = "") -> None:
        """
        Create a draft FD warning.
        
        Parameters:
        - msg: Custom warning message
        """

Error Code Mapping

PyZMQ maps libzmq error codes to Python exceptions:

# Common errno mappings
EAGAIN -> Again              # Resource temporarily unavailable
EINTR -> InterruptedSystemCall   # Interrupted system call
ETERM -> ContextTerminated   # Context terminated
ENOTSUP -> ZMQError         # Operation not supported
EINVAL -> ZMQError          # Invalid argument
EMSGSIZE -> MessageTooLong  # Message too long
EFSM -> ZMQError            # Finite state machine error

Usage Examples

Basic Error Handling

import zmq

context = zmq.Context()
socket = context.socket(zmq.REQ)

try:
    # This will raise ZMQError (no connection)  
    socket.recv(zmq.NOBLOCK)
except zmq.Again:
    print("No message available (would block)")
except zmq.ZMQError as e:
    print(f"ZMQ error {e.errno}: {e.strerror}")
finally:
    socket.close()
    context.term()

Specific Error Types

import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)

try:
    # This may fail if address is already in use
    socket.bind("tcp://*:5555")
except zmq.ZMQBindError as e:
    print(f"Failed to bind to {e.address}: {e.strerror}")
except zmq.ZMQError as e:
    print(f"Other ZMQ error: {e}")
finally:
    socket.close()
    context.term()

Non-Blocking Operations

import zmq

context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://localhost:5555")

while True:
    try:
        # Non-blocking receive
        message = socket.recv(zmq.NOBLOCK)
        print(f"Received: {message}")
    except zmq.Again:
        # No message available, do other work
        print("No message, continuing...")
        time.sleep(0.1)
    except zmq.ZMQError as e:
        print(f"ZMQ error: {e}")
        break

socket.close()
context.term()

Context Termination Handling

import zmq
import threading
import time

def worker():
    """Worker thread that handles context termination"""
    try:
        socket = context.socket(zmq.PULL)
        socket.connect("inproc://work")
        
        while True:
            message = socket.recv()
            print(f"Processing: {message}")
    except zmq.ContextTerminated:
        print("Context terminated, worker exiting")
    except zmq.ZMQError as e:
        print(f"Worker error: {e}")
    finally:
        socket.close()

context = zmq.Context()

# Start worker thread
worker_thread = threading.Thread(target=worker)
worker_thread.start()

try:
    time.sleep(2)
    print("Terminating context...")
    context.term()  # This will cause ContextTerminated in worker
    
    worker_thread.join()
    print("Worker thread finished")
except KeyboardInterrupt:
    print("Interrupted")
    context.term()
    worker_thread.join()

Version Checking with Error Handling

import zmq

def require_zmq_version(min_version):
    """Ensure minimum ZMQ version or raise error"""
    actual = zmq.zmq_version_info()
    
    if actual < min_version:
        raise zmq.ZMQVersionError(
            min_version=min_version,
            actual_version=actual,
            msg=f"Requires ZMQ {min_version}, got {actual}"
        )

try:
    # Require ZMQ 4.2.0 or later
    require_zmq_version((4, 2, 0))
    print("ZMQ version OK")
except zmq.ZMQVersionError as e:
    print(f"Version error: {e}")
    print(f"Required: {e.min_version}")
    print(f"Actual: {e.actual_version}")

Message Tracking Error Handling

import zmq

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5555")

try:
    # Send with tracking
    tracker = socket.send(b"Large message", track=True)
    
    # Wait for completion
    if not tracker.wait(timeout=5000):
        print("Send timeout")
    else:
        print("Message sent successfully")
        
except zmq.NotDone:
    print("Message sending not complete")
except zmq.ZMQError as e:
    print(f"Send error: {e}")
finally:
    socket.close()
    context.term()

Authentication Error Handling

import zmq
from zmq.auth import ThreadAuthenticator

context = zmq.Context()

try:
    # Server with authentication
    auth = ThreadAuthenticator(context)
    auth.start()
    
    passwords = {'user': 'secret'}
    auth.configure_plain(domain='*', passwords=passwords)
    
    server = context.socket(zmq.REP)
    server.set(zmq.PLAIN_SERVER, 1)
    server.bind("tcp://*:5555")
    
    # This may raise authentication errors
    message = server.recv()
    server.send(b"Authenticated response")
    
except zmq.ZMQAuthError as e:
    print(f"Authentication error: {e}")
except zmq.ZMQError as e:
    print(f"ZMQ error: {e}")
finally:
    if 'server' in locals():
        server.close()
    if 'auth' in locals():
        auth.stop()
    context.term()

Error Logging and Debugging

import zmq
import logging

# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

context = zmq.Context()
socket = context.socket(zmq.REQ)

try:
    socket.connect("tcp://localhost:5555")
    socket.send_string("Hello")
    
    # Set receive timeout
    socket.setsockopt(zmq.RCVTIMEO, 5000)
    response = socket.recv_string()
    
except zmq.Again:
    logger.warning("Receive timeout - no response from server")
except zmq.ZMQError as e:
    logger.error(f"ZMQ error {e.errno}: {e.strerror}")
    logger.debug(f"Socket state: {socket.getsockopt(zmq.EVENTS)}")
except Exception as e:
    logger.exception(f"Unexpected error: {e}")
finally:
    socket.close()
    context.term()

Retry Logic with Error Handling

import zmq
import time
import random

def send_with_retry(socket, message, max_retries=3):
    """Send message with exponential backoff retry"""
    
    for attempt in range(max_retries + 1):
        try:
            socket.send_string(message, zmq.NOBLOCK)
            return True
            
        except zmq.Again:
            if attempt == max_retries:
                raise zmq.ZMQError(zmq.EAGAIN, "Max retries exceeded")
            
            # Exponential backoff with jitter
            delay = (2 ** attempt) + random.uniform(0, 1)
            print(f"Send failed, retrying in {delay:.2f}s...")
            time.sleep(delay)
            
        except zmq.ZMQError as e:
            print(f"Non-recoverable error: {e}")
            raise

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.setsockopt(zmq.SNDHWM, 1)  # Low HWM to trigger blocking

try:
    socket.bind("tcp://*:5555")
    
    # This may block/fail due to low HWM
    send_with_retry(socket, "Test message")
    print("Message sent successfully")
    
except zmq.ZMQError as e:
    print(f"Failed to send: {e}")
finally:
    socket.close()
    context.term()

Custom Error Context

import zmq

class ZMQOperationError(zmq.ZMQError):
    """Custom error with operation context"""
    
    def __init__(self, operation, errno=None, msg=None):
        super().__init__(errno, msg)
        self.operation = operation
    
    def __str__(self):
        return f"{self.operation} failed: {self.strerror}"

def safe_socket_operation(socket, operation, *args, **kwargs):
    """Wrapper for socket operations with enhanced error context"""
    
    try:
        method = getattr(socket, operation)
        return method(*args, **kwargs)
        
    except zmq.ZMQError as e:
        # Re-raise with operation context
        raise ZMQOperationError(
            operation=operation,
            errno=e.errno,
            msg=f"Operation '{operation}' failed: {e.strerror}"
        ) from e

context = zmq.Context()
socket = context.socket(zmq.REQ)

try:
    safe_socket_operation(socket, 'connect', "tcp://localhost:5555")
    safe_socket_operation(socket, 'send_string', "Hello")
    response = safe_socket_operation(socket, 'recv_string')
    print(f"Response: {response}")
    
except ZMQOperationError as e:
    print(f"Operation error: {e}")
    print(f"Failed operation: {e.operation}")
except zmq.ZMQError as e:
    print(f"ZMQ error: {e}")
finally:
    socket.close()
    context.term()

Error Recovery Patterns

import zmq
import time

class RobustSocket:
    """Socket wrapper with automatic error recovery"""
    
    def __init__(self, context, socket_type, address):
        self.context = context
        self.socket_type = socket_type
        self.address = address
        self.socket = None
        self._connect()
    
    def _connect(self):
        """Create and connect socket"""
        if self.socket:
            self.socket.close()
        
        self.socket = self.context.socket(self.socket_type)
        self.socket.setsockopt(zmq.LINGER, 0)
        self.socket.connect(self.address)
    
    def send_reliable(self, message, timeout=5000):
        """Send with automatic recovery on failure"""
        
        for attempt in range(3):
            try:
                self.socket.send_string(message, zmq.NOBLOCK)
                return True
                
            except zmq.Again:
                # Socket may be blocked, try recreating
                print(f"Send blocked, recreating socket (attempt {attempt + 1})")
                self._connect()
                continue
                
            except zmq.ZMQError as e:
                if e.errno == zmq.ETERM:
                    print("Context terminated")
                    return False
                elif attempt == 2:
                    raise
                else:
                    print(f"ZMQ error, retrying: {e}")
                    self._connect()
                    time.sleep(0.1)
        
        return False
    
    def close(self):
        if self.socket:
            self.socket.close()

# Usage
context = zmq.Context()

try:
    robust_socket = RobustSocket(context, zmq.REQ, "tcp://localhost:5555")
    
    success = robust_socket.send_reliable("Test message")
    if success:
        print("Message sent successfully")
    else:
        print("Failed to send message")
        
except zmq.ZMQError as e:
    print(f"Unrecoverable error: {e}")
finally:
    robust_socket.close()
    context.term()

Error Code Reference

Common ZMQ error codes and their meanings:

import zmq

# Network errors
zmq.EADDRINUSE      # Address already in use
zmq.EADDRNOTAVAIL   # Cannot assign requested address  
zmq.ECONNREFUSED    # Connection refused
zmq.ENETUNREACH     # Network is unreachable
zmq.ETIMEDOUT       # Connection timed out

# State errors
zmq.EFSM            # Operation not valid in current state
zmq.ETERM           # Context was terminated
zmq.EAGAIN          # Resource temporarily unavailable

# Configuration errors
zmq.EINVAL          # Invalid argument
zmq.ENOTSUP         # Operation not supported
zmq.EMSGSIZE        # Message too long

# System errors
zmq.EINTR           # Interrupted system call
zmq.ENOMEM          # Cannot allocate memory
zmq.EFAULT          # Bad address

Types

from typing import Optional, Union, Tuple
import errno

# Error types
ErrorCode = int
ErrorMessage = str
ZMQErrorNumber = Union[int, None]

# Version types
VersionTuple = Tuple[int, int, int]

# Address types  
BindAddress = str
ConnectAddress = str

# Authentication types
ZAPStatusCode = str
ZAPStatusText = str

# Message types
MessageContent = Union[bytes, str]
MessageSize = int

# Context types
OperationName = str
OperationContext = dict

Install with Tessl CLI

npx tessl i tessl/pypi-pyzmq

docs

async-support.md

authentication.md

constants.md

core-messaging.md

devices.md

error-handling.md

index.md

message-handling.md

polling.md

tile.json