CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyzmq

Python bindings for ZeroMQ (ØMQ), a lightweight and fast messaging library

Pending
Overview
Eval results
Files

core-messaging.mddocs/

Core Messaging

The fundamental Context and Socket classes that form the foundation of all ZMQ communication. These classes provide synchronous messaging operations and support all ZMQ socket types and messaging patterns.

Capabilities

Context Management

The Context class manages ZMQ contexts, which are containers for all sockets in a single process. Contexts handle I/O threads, socket limits, and global settings.

class Context:
    def __init__(self, io_threads: int | Context = 1, shadow: Context | int = 0) -> None:
        """
        Create a new ZMQ context.
        
        Parameters:
        - io_threads: Number of I/O threads or existing Context to shadow (default: 1)
        - shadow: Context or address to shadow (default: 0)
        """

    def socket(self, socket_type: int) -> Socket:
        """
        Create a socket of the specified type.
        
        Parameters:
        - socket_type: ZMQ socket type constant (REQ, REP, PUB, SUB, etc.)
        
        Returns:
        - Socket: New socket instance
        """

    def term(self) -> None:
        """Terminate the context and close all associated sockets."""

    def destroy(self, linger: int = None) -> None:
        """
        Close all sockets and terminate context with optional linger period.
        
        Parameters:
        - linger: Time in milliseconds to wait for messages to be sent
        """

    def set(self, option: int, value: int) -> None:
        """
        Set a context option.
        
        Parameters:
        - option: Context option constant (IO_THREADS, MAX_SOCKETS, etc.)
        - value: Option value
        """

    def get(self, option: int) -> int:
        """
        Get a context option value.
        
        Parameters:
        - option: Context option constant
        
        Returns:
        - int: Current option value
        """

    def __enter__(self) -> Context:
        """Context manager entry."""
    
    def __exit__(self, exc_type, exc_value, traceback) -> None:
        """Context manager exit - destroys context."""
    
    @classmethod
    def instance(cls, io_threads: int = 1) -> Context:
        """
        Return a global Context instance.
        
        Parameters:
        - io_threads: Number of I/O threads for new instance
        
        Returns:
        - Context: Global singleton context instance
        """
    
    @classmethod
    def shadow(cls, address: int | Context) -> Context:
        """
        Shadow an existing libzmq context.
        
        Parameters:
        - address: Context or integer address to shadow
        
        Returns:
        - Context: New context shadowing the existing one
        """
    
    @property
    def underlying(self) -> int:
        """Integer address of the underlying libzmq context."""
    
    @property
    def closed(self) -> bool:
        """True if the context has been terminated."""

Usage example:

import zmq

# Create context with 2 I/O threads
context = zmq.Context(io_threads=2)

# Set context options
context.set(zmq.MAX_SOCKETS, 1024)

# Use as context manager for automatic cleanup
with zmq.Context() as ctx:
    socket = ctx.socket(zmq.REQ)
    # Socket operations...
    # Context automatically terminated when leaving with block

Socket Operations

The Socket class provides methods for connecting, binding, sending, and receiving messages across all ZMQ socket types.

class Socket:
    def bind(self, address: str) -> SocketContext:
        """
        Bind socket to an address. Returns context manager for automatic unbind.
        
        Parameters:
        - address: Address string (tcp://*:5555, ipc:///tmp/socket, inproc://workers)
        
        Returns:
        - SocketContext: Context manager for automatic unbind on exit
        """

    def bind_to_random_port(self, address: str, min_port: int = 49152, max_port: int = 65536, max_tries: int = 100) -> int:
        """
        Bind socket to a random port in the specified range.
        
        Parameters:
        - address: Address template (tcp://*:%s)
        - min_port: Minimum port number
        - max_port: Maximum port number  
        - max_tries: Maximum binding attempts
        
        Returns:
        - int: The port number that was bound
        """

    def connect(self, address: str) -> SocketContext:
        """
        Connect socket to an address. Returns context manager for automatic disconnect.
        
        Parameters:
        - address: Address string (tcp://localhost:5555, ipc:///tmp/socket)
        
        Returns:
        - SocketContext: Context manager for automatic disconnect on exit
        """

    def disconnect(self, address: str) -> None:
        """
        Disconnect socket from an address.
        
        Parameters:
        - address: Address string to disconnect from
        """

    def unbind(self, address: str) -> None:
        """
        Unbind socket from an address.
        
        Parameters:
        - address: Address string to unbind from
        """

    def close(self, linger: int = None) -> None:
        """
        Close the socket.
        
        Parameters:
        - linger: Linger period in milliseconds (None for default)
        """

    def __enter__(self) -> Socket:
        """Context manager entry."""
    
    def __exit__(self, exc_type, exc_value, traceback) -> None:
        """Context manager exit - closes socket."""
    
    def poll(self, timeout: int = -1, flags: int = POLLIN) -> int:
        """
        Poll socket for events.
        
        Parameters:
        - timeout: Timeout in milliseconds (-1 for infinite)
        - flags: Poll flags (POLLIN, POLLOUT, POLLERR)
        
        Returns:
        - int: Events that occurred (bitmask)
        """
    
    def fileno(self) -> int:
        """
        Get file descriptor for socket integration with select/poll.
        
        Returns:
        - int: File descriptor
        """
    
    def subscribe(self, topic: str | bytes) -> None:
        """
        Subscribe to a topic (SUB sockets only).
        
        Parameters:
        - topic: Topic to subscribe to
        """
    
    def unsubscribe(self, topic: str | bytes) -> None:
        """
        Unsubscribe from a topic (SUB sockets only).
        
        Parameters:
        - topic: Topic to unsubscribe from
        """
    
    @classmethod
    def shadow(cls, address: int | Socket) -> Socket:
        """
        Shadow an existing libzmq socket.
        
        Parameters:
        - address: Socket or integer address to shadow
        
        Returns:
        - Socket: New socket shadowing the existing one
        """
    
    @property
    def underlying(self) -> int:
        """Integer address of the underlying libzmq socket."""
    
    @property
    def type(self) -> int:
        """Socket type (REQ, REP, PUB, SUB, etc.)."""
    
    @property
    def last_endpoint(self) -> str:
        """Last bound or connected endpoint address."""
    
    @property
    def copy_threshold(self) -> int:
        """Threshold for copying vs zero-copy operations."""
    
    @copy_threshold.setter
    def copy_threshold(self, value: int) -> None:
        """Set copy threshold."""
    
    @property
    def closed(self) -> bool:
        """True if the socket has been closed."""

Message Sending

Methods for sending various data types with optional flags and routing information.

def send(self, data: Union[bytes, Frame], flags: int = 0, copy: bool = True, track: bool = False) -> Optional[MessageTracker]:
    """
    Send a message.
    
    Parameters:
    - data: Message data as bytes or Frame
    - flags: Send flags (NOBLOCK, SNDMORE)
    - copy: Whether to copy the message data  
    - track: Whether to return a MessageTracker
    
    Returns:
    - MessageTracker: If track=True, tracker for send completion
    """

def send_string(self, string: str, flags: int = 0, encoding: str = 'utf-8', copy: bool = True, track: bool = False) -> MessageTracker | None:
    """
    Send a string message.
    
    Parameters:
    - string: String to send
    - flags: Send flags (NOBLOCK, SNDMORE)
    - encoding: String encoding (default: utf-8)
    """

def send_pyobj(self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL, copy: bool = True, track: bool = False) -> MessageTracker | None:
    """
    Send a Python object using pickle.
    
    Parameters:
    - obj: Python object to send
    - flags: Send flags
    - protocol: Pickle protocol version
    """

def send_json(self, obj: Any, flags: int = 0, copy: bool = True, track: bool = False, **kwargs) -> MessageTracker | None:
    """
    Send a JSON-serializable object.
    
    Parameters:
    - obj: JSON-serializable object
    - flags: Send flags
    - kwargs: Additional arguments for json.dumps()
    """

def send_multipart(self, msg_parts: list, flags: int = 0, copy: bool = True, track: bool = False) -> MessageTracker | None:
    """
    Send a multipart message.
    
    Parameters:
    - msg_parts: List of message parts (bytes, strings, or Frames)
    - flags: Send flags
    - copy: Whether to copy message data
    - track: Whether to return MessageTracker
    
    Returns:
    - MessageTracker: If track=True, tracker for send completion
    """

def send_serialized(self, msg: Any, serialize: Callable, flags: int = 0, copy: bool = True, track: bool = False) -> MessageTracker | None:
    """
    Send a message with custom serialization.
    
    Parameters:
    - msg: Message to serialize and send
    - serialize: Serialization function
    - flags: Send flags
    - copy: Whether to copy message data  
    - track: Whether to return MessageTracker
    
    Returns:
    - MessageTracker: If track=True, tracker for send completion
    """
    """
    Send a multipart message.
    
    Parameters:
    - msg_parts: List of message parts (bytes, strings, or Frames)
    - flags: Send flags
    - copy: Whether to copy message data
    - track: Whether to return MessageTracker
    
    Returns:
    - MessageTracker: If track=True, tracker for send completion
    """

Message Receiving

Methods for receiving various data types with optional flags and timeout handling.

def recv(self, flags: int = 0, copy: bool = True, track: bool = False) -> Union[bytes, Frame]:
    """
    Receive a message.
    
    Parameters:
    - flags: Receive flags (NOBLOCK)
    - copy: Whether to copy the message data
    - track: Whether to return a Frame with tracking
    
    Returns:
    - bytes or Frame: Received message data
    """

def recv_string(self, flags: int = 0, encoding: str = 'utf-8') -> str:
    """
    Receive a string message.
    
    Parameters:
    - flags: Receive flags (NOBLOCK)
    - encoding: String encoding (default: utf-8)
    
    Returns:
    - str: Received string
    """

def recv_pyobj(self, flags: int = 0) -> Any:
    """
    Receive a Python object using pickle.
    
    Parameters:
    - flags: Receive flags
    
    Returns:
    - Any: Unpickled Python object
    """

def recv_json(self, flags: int = 0, **kwargs) -> Any:
    """
    Receive a JSON object.
    
    Parameters:
    - flags: Receive flags
    - kwargs: Additional arguments for json.loads()
    
    Returns:
    - Any: Deserialized JSON object
    """

def recv_multipart(self, flags: int = 0, copy: bool = True, track: bool = False) -> list:
    """
    Receive a multipart message.
    
    Parameters:
    - flags: Receive flags
    - copy: Whether to copy message data
    - track: Whether to return Frames with tracking
    
    Returns:
    - list: List of message parts
    """

def recv_serialized(self, deserialize: Callable, flags: int = 0, copy: bool = True) -> Any:
    """
    Receive a message with custom deserialization.
    
    Parameters:
    - deserialize: Deserialization function
    - flags: Receive flags
    - copy: Whether to copy message data
    
    Returns:
    - Any: Deserialized message
    """

def recv_into(self, buf: Any, flags: int = 0, copy: bool = True, track: bool = False) -> int:
    """
    Receive a message into an existing buffer.
    
    Parameters:
    - buf: Buffer to receive into
    - flags: Receive flags
    - copy: Whether to copy message data
    - track: Whether to return Frame with tracking
    
    Returns:
    - int: Number of bytes received
    """

Socket Configuration

Methods for getting and setting socket options that control behavior, performance, and protocol settings.

def set(self, option: int, value: int | bytes | str) -> None:
    """
    Set a socket option (preferred method name).
    
    Parameters:
    - option: Socket option constant (LINGER, RCVHWM, SNDHWM, etc.)
    - value: Option value (type depends on option)
    """

def setsockopt(self, option: int, value: int | bytes | str) -> None:
    """
    Set a socket option.
    
    Parameters:
    - option: Socket option constant (LINGER, RCVHWM, SNDHWM, etc.)
    - value: Option value (type depends on option)
    """

def get(self, option: int) -> int | bytes:
    """
    Get a socket option value (preferred method name).
    
    Parameters:
    - option: Socket option constant
    
    Returns:
    - int or bytes: Current option value
    """

def getsockopt(self, option: int) -> int | bytes:
    """
    Get a socket option value.
    
    Parameters:
    - option: Socket option constant
    
    Returns:
    - int or bytes: Current option value
    """

def set_string(self, option: int, value: str, encoding: str = 'utf-8') -> None:
    """
    Set a socket option with string value (preferred method name).
    
    Parameters:
    - option: Socket option constant
    - value: String value
    - encoding: String encoding
    """

def setsockopt_string(self, option: int, value: str, encoding: str = 'utf-8') -> None:
    """
    Set a socket option with string value.
    
    Parameters:
    - option: Socket option constant
    - value: String value
    - encoding: String encoding
    """

def get_string(self, option: int, encoding: str = 'utf-8') -> str:
    """
    Get a socket option as string (preferred method name).
    
    Parameters:
    - option: Socket option constant
    - encoding: String encoding
    
    Returns:
    - str: Option value as string
    """

def getsockopt_string(self, option: int, encoding: str = 'utf-8') -> str:
    """
    Get a socket option as string.
    
    Parameters:
    - option: Socket option constant
    - encoding: String encoding
    
    Returns:
    - str: Option value as string
    """

@property
def hwm(self) -> int:
    """High water mark for both send and receive."""

@hwm.setter
def hwm(self, value: int) -> None:
    """Set high water mark for both send and receive."""

@property
def linger(self) -> int:
    """Linger period for socket closure."""

@linger.setter  
def linger(self, value: int) -> None:
    """Set linger period for socket closure."""

Socket Monitoring

Methods for monitoring socket events and state changes.

def monitor(self, address: str, events: int = EVENT_ALL) -> None:
    """
    Start monitoring socket events.
    
    Parameters:
    - address: Address for monitor socket (inproc://monitor.socket)
    - events: Bitmask of events to monitor
    """

def get_monitor_socket(self, events: int = EVENT_ALL, addr: str = None) -> Socket:
    """
    Get a PAIR socket for receiving monitor events.
    
    Parameters:
    - events: Bitmask of events to monitor  
    - addr: Optional address for monitor socket
    
    Returns:
    - Socket: PAIR socket for receiving events
    """

def disable_monitor(self) -> None:
    """Stop monitoring socket events."""

Usage Examples

Request-Reply Pattern

import zmq

# Server
with zmq.Context() as context:
    socket = context.socket(zmq.REP)
    socket.bind("tcp://*:5555")
    
    while True:
        message = socket.recv_string()
        print(f"Received: {message}")
        socket.send_string(f"Echo: {message}")

# Client  
with zmq.Context() as context:
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5555")
    
    socket.send_string("Hello World")
    reply = socket.recv_string()
    print(f"Reply: {reply}")

Publisher-Subscriber Pattern

import time
import zmq

# Publisher
with zmq.Context() as context:
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5556")
    
    for i in range(100):
        topic = "weather" if i % 2 else "news"
        message = f"{topic} Update {i}"
        socket.send_string(f"{topic} {message}")
        time.sleep(0.1)

# Subscriber
with zmq.Context() as context:
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:5556")
    socket.setsockopt_string(zmq.SUBSCRIBE, "weather")
    
    while True:
        message = socket.recv_string()
        print(f"Received: {message}")

Types

from typing import Union, Optional, Any, List, Callable

# Message data types
MessageData = Union[bytes, str, memoryview, Frame]
MultipartMessage = List[MessageData]

# Socket option value types  
OptionValue = Union[int, bytes, str]

# Address types
Address = str

# Context manager type
SocketContext = Any  # Context manager for bind/connect operations

# Serialization function types
Serializer = Callable[[Any], bytes]
Deserializer = Callable[[bytes], Any]

Install with Tessl CLI

npx tessl i tessl/pypi-pyzmq

docs

async-support.md

authentication.md

constants.md

core-messaging.md

devices.md

error-handling.md

index.md

message-handling.md

polling.md

tile.json