Python bindings for ZeroMQ (ØMQ), a lightweight and fast messaging library
—
The fundamental Context and Socket classes that form the foundation of all ZMQ communication. These classes provide synchronous messaging operations and support all ZMQ socket types and messaging patterns.
The Context class manages ZMQ contexts, which are containers for all sockets in a single process. Contexts handle I/O threads, socket limits, and global settings.
class Context:
def __init__(self, io_threads: int | Context = 1, shadow: Context | int = 0) -> None:
"""
Create a new ZMQ context.
Parameters:
- io_threads: Number of I/O threads or existing Context to shadow (default: 1)
- shadow: Context or address to shadow (default: 0)
"""
def socket(self, socket_type: int) -> Socket:
"""
Create a socket of the specified type.
Parameters:
- socket_type: ZMQ socket type constant (REQ, REP, PUB, SUB, etc.)
Returns:
- Socket: New socket instance
"""
def term(self) -> None:
"""Terminate the context and close all associated sockets."""
def destroy(self, linger: int = None) -> None:
"""
Close all sockets and terminate context with optional linger period.
Parameters:
- linger: Time in milliseconds to wait for messages to be sent
"""
def set(self, option: int, value: int) -> None:
"""
Set a context option.
Parameters:
- option: Context option constant (IO_THREADS, MAX_SOCKETS, etc.)
- value: Option value
"""
def get(self, option: int) -> int:
"""
Get a context option value.
Parameters:
- option: Context option constant
Returns:
- int: Current option value
"""
def __enter__(self) -> Context:
"""Context manager entry."""
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""Context manager exit - destroys context."""
@classmethod
def instance(cls, io_threads: int = 1) -> Context:
"""
Return a global Context instance.
Parameters:
- io_threads: Number of I/O threads for new instance
Returns:
- Context: Global singleton context instance
"""
@classmethod
def shadow(cls, address: int | Context) -> Context:
"""
Shadow an existing libzmq context.
Parameters:
- address: Context or integer address to shadow
Returns:
- Context: New context shadowing the existing one
"""
@property
def underlying(self) -> int:
"""Integer address of the underlying libzmq context."""
@property
def closed(self) -> bool:
"""True if the context has been terminated."""Usage example:
import zmq
# Create context with 2 I/O threads
context = zmq.Context(io_threads=2)
# Set context options
context.set(zmq.MAX_SOCKETS, 1024)
# Use as context manager for automatic cleanup
with zmq.Context() as ctx:
socket = ctx.socket(zmq.REQ)
# Socket operations...
# Context automatically terminated when leaving with blockThe Socket class provides methods for connecting, binding, sending, and receiving messages across all ZMQ socket types.
class Socket:
def bind(self, address: str) -> SocketContext:
"""
Bind socket to an address. Returns context manager for automatic unbind.
Parameters:
- address: Address string (tcp://*:5555, ipc:///tmp/socket, inproc://workers)
Returns:
- SocketContext: Context manager for automatic unbind on exit
"""
def bind_to_random_port(self, address: str, min_port: int = 49152, max_port: int = 65536, max_tries: int = 100) -> int:
"""
Bind socket to a random port in the specified range.
Parameters:
- address: Address template (tcp://*:%s)
- min_port: Minimum port number
- max_port: Maximum port number
- max_tries: Maximum binding attempts
Returns:
- int: The port number that was bound
"""
def connect(self, address: str) -> SocketContext:
"""
Connect socket to an address. Returns context manager for automatic disconnect.
Parameters:
- address: Address string (tcp://localhost:5555, ipc:///tmp/socket)
Returns:
- SocketContext: Context manager for automatic disconnect on exit
"""
def disconnect(self, address: str) -> None:
"""
Disconnect socket from an address.
Parameters:
- address: Address string to disconnect from
"""
def unbind(self, address: str) -> None:
"""
Unbind socket from an address.
Parameters:
- address: Address string to unbind from
"""
def close(self, linger: int = None) -> None:
"""
Close the socket.
Parameters:
- linger: Linger period in milliseconds (None for default)
"""
def __enter__(self) -> Socket:
"""Context manager entry."""
def __exit__(self, exc_type, exc_value, traceback) -> None:
"""Context manager exit - closes socket."""
def poll(self, timeout: int = -1, flags: int = POLLIN) -> int:
"""
Poll socket for events.
Parameters:
- timeout: Timeout in milliseconds (-1 for infinite)
- flags: Poll flags (POLLIN, POLLOUT, POLLERR)
Returns:
- int: Events that occurred (bitmask)
"""
def fileno(self) -> int:
"""
Get file descriptor for socket integration with select/poll.
Returns:
- int: File descriptor
"""
def subscribe(self, topic: str | bytes) -> None:
"""
Subscribe to a topic (SUB sockets only).
Parameters:
- topic: Topic to subscribe to
"""
def unsubscribe(self, topic: str | bytes) -> None:
"""
Unsubscribe from a topic (SUB sockets only).
Parameters:
- topic: Topic to unsubscribe from
"""
@classmethod
def shadow(cls, address: int | Socket) -> Socket:
"""
Shadow an existing libzmq socket.
Parameters:
- address: Socket or integer address to shadow
Returns:
- Socket: New socket shadowing the existing one
"""
@property
def underlying(self) -> int:
"""Integer address of the underlying libzmq socket."""
@property
def type(self) -> int:
"""Socket type (REQ, REP, PUB, SUB, etc.)."""
@property
def last_endpoint(self) -> str:
"""Last bound or connected endpoint address."""
@property
def copy_threshold(self) -> int:
"""Threshold for copying vs zero-copy operations."""
@copy_threshold.setter
def copy_threshold(self, value: int) -> None:
"""Set copy threshold."""
@property
def closed(self) -> bool:
"""True if the socket has been closed."""Methods for sending various data types with optional flags and routing information.
def send(self, data: Union[bytes, Frame], flags: int = 0, copy: bool = True, track: bool = False) -> Optional[MessageTracker]:
"""
Send a message.
Parameters:
- data: Message data as bytes or Frame
- flags: Send flags (NOBLOCK, SNDMORE)
- copy: Whether to copy the message data
- track: Whether to return a MessageTracker
Returns:
- MessageTracker: If track=True, tracker for send completion
"""
def send_string(self, string: str, flags: int = 0, encoding: str = 'utf-8', copy: bool = True, track: bool = False) -> MessageTracker | None:
"""
Send a string message.
Parameters:
- string: String to send
- flags: Send flags (NOBLOCK, SNDMORE)
- encoding: String encoding (default: utf-8)
"""
def send_pyobj(self, obj: Any, flags: int = 0, protocol: int = DEFAULT_PROTOCOL, copy: bool = True, track: bool = False) -> MessageTracker | None:
"""
Send a Python object using pickle.
Parameters:
- obj: Python object to send
- flags: Send flags
- protocol: Pickle protocol version
"""
def send_json(self, obj: Any, flags: int = 0, copy: bool = True, track: bool = False, **kwargs) -> MessageTracker | None:
"""
Send a JSON-serializable object.
Parameters:
- obj: JSON-serializable object
- flags: Send flags
- kwargs: Additional arguments for json.dumps()
"""
def send_multipart(self, msg_parts: list, flags: int = 0, copy: bool = True, track: bool = False) -> MessageTracker | None:
"""
Send a multipart message.
Parameters:
- msg_parts: List of message parts (bytes, strings, or Frames)
- flags: Send flags
- copy: Whether to copy message data
- track: Whether to return MessageTracker
Returns:
- MessageTracker: If track=True, tracker for send completion
"""
def send_serialized(self, msg: Any, serialize: Callable, flags: int = 0, copy: bool = True, track: bool = False) -> MessageTracker | None:
"""
Send a message with custom serialization.
Parameters:
- msg: Message to serialize and send
- serialize: Serialization function
- flags: Send flags
- copy: Whether to copy message data
- track: Whether to return MessageTracker
Returns:
- MessageTracker: If track=True, tracker for send completion
"""
"""
Send a multipart message.
Parameters:
- msg_parts: List of message parts (bytes, strings, or Frames)
- flags: Send flags
- copy: Whether to copy message data
- track: Whether to return MessageTracker
Returns:
- MessageTracker: If track=True, tracker for send completion
"""Methods for receiving various data types with optional flags and timeout handling.
def recv(self, flags: int = 0, copy: bool = True, track: bool = False) -> Union[bytes, Frame]:
"""
Receive a message.
Parameters:
- flags: Receive flags (NOBLOCK)
- copy: Whether to copy the message data
- track: Whether to return a Frame with tracking
Returns:
- bytes or Frame: Received message data
"""
def recv_string(self, flags: int = 0, encoding: str = 'utf-8') -> str:
"""
Receive a string message.
Parameters:
- flags: Receive flags (NOBLOCK)
- encoding: String encoding (default: utf-8)
Returns:
- str: Received string
"""
def recv_pyobj(self, flags: int = 0) -> Any:
"""
Receive a Python object using pickle.
Parameters:
- flags: Receive flags
Returns:
- Any: Unpickled Python object
"""
def recv_json(self, flags: int = 0, **kwargs) -> Any:
"""
Receive a JSON object.
Parameters:
- flags: Receive flags
- kwargs: Additional arguments for json.loads()
Returns:
- Any: Deserialized JSON object
"""
def recv_multipart(self, flags: int = 0, copy: bool = True, track: bool = False) -> list:
"""
Receive a multipart message.
Parameters:
- flags: Receive flags
- copy: Whether to copy message data
- track: Whether to return Frames with tracking
Returns:
- list: List of message parts
"""
def recv_serialized(self, deserialize: Callable, flags: int = 0, copy: bool = True) -> Any:
"""
Receive a message with custom deserialization.
Parameters:
- deserialize: Deserialization function
- flags: Receive flags
- copy: Whether to copy message data
Returns:
- Any: Deserialized message
"""
def recv_into(self, buf: Any, flags: int = 0, copy: bool = True, track: bool = False) -> int:
"""
Receive a message into an existing buffer.
Parameters:
- buf: Buffer to receive into
- flags: Receive flags
- copy: Whether to copy message data
- track: Whether to return Frame with tracking
Returns:
- int: Number of bytes received
"""Methods for getting and setting socket options that control behavior, performance, and protocol settings.
def set(self, option: int, value: int | bytes | str) -> None:
"""
Set a socket option (preferred method name).
Parameters:
- option: Socket option constant (LINGER, RCVHWM, SNDHWM, etc.)
- value: Option value (type depends on option)
"""
def setsockopt(self, option: int, value: int | bytes | str) -> None:
"""
Set a socket option.
Parameters:
- option: Socket option constant (LINGER, RCVHWM, SNDHWM, etc.)
- value: Option value (type depends on option)
"""
def get(self, option: int) -> int | bytes:
"""
Get a socket option value (preferred method name).
Parameters:
- option: Socket option constant
Returns:
- int or bytes: Current option value
"""
def getsockopt(self, option: int) -> int | bytes:
"""
Get a socket option value.
Parameters:
- option: Socket option constant
Returns:
- int or bytes: Current option value
"""
def set_string(self, option: int, value: str, encoding: str = 'utf-8') -> None:
"""
Set a socket option with string value (preferred method name).
Parameters:
- option: Socket option constant
- value: String value
- encoding: String encoding
"""
def setsockopt_string(self, option: int, value: str, encoding: str = 'utf-8') -> None:
"""
Set a socket option with string value.
Parameters:
- option: Socket option constant
- value: String value
- encoding: String encoding
"""
def get_string(self, option: int, encoding: str = 'utf-8') -> str:
"""
Get a socket option as string (preferred method name).
Parameters:
- option: Socket option constant
- encoding: String encoding
Returns:
- str: Option value as string
"""
def getsockopt_string(self, option: int, encoding: str = 'utf-8') -> str:
"""
Get a socket option as string.
Parameters:
- option: Socket option constant
- encoding: String encoding
Returns:
- str: Option value as string
"""
@property
def hwm(self) -> int:
"""High water mark for both send and receive."""
@hwm.setter
def hwm(self, value: int) -> None:
"""Set high water mark for both send and receive."""
@property
def linger(self) -> int:
"""Linger period for socket closure."""
@linger.setter
def linger(self, value: int) -> None:
"""Set linger period for socket closure."""Methods for monitoring socket events and state changes.
def monitor(self, address: str, events: int = EVENT_ALL) -> None:
"""
Start monitoring socket events.
Parameters:
- address: Address for monitor socket (inproc://monitor.socket)
- events: Bitmask of events to monitor
"""
def get_monitor_socket(self, events: int = EVENT_ALL, addr: str = None) -> Socket:
"""
Get a PAIR socket for receiving monitor events.
Parameters:
- events: Bitmask of events to monitor
- addr: Optional address for monitor socket
Returns:
- Socket: PAIR socket for receiving events
"""
def disable_monitor(self) -> None:
"""Stop monitoring socket events."""import zmq
# Server
with zmq.Context() as context:
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
message = socket.recv_string()
print(f"Received: {message}")
socket.send_string(f"Echo: {message}")
# Client
with zmq.Context() as context:
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
socket.send_string("Hello World")
reply = socket.recv_string()
print(f"Reply: {reply}")import time
import zmq
# Publisher
with zmq.Context() as context:
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")
for i in range(100):
topic = "weather" if i % 2 else "news"
message = f"{topic} Update {i}"
socket.send_string(f"{topic} {message}")
time.sleep(0.1)
# Subscriber
with zmq.Context() as context:
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
socket.setsockopt_string(zmq.SUBSCRIBE, "weather")
while True:
message = socket.recv_string()
print(f"Received: {message}")from typing import Union, Optional, Any, List, Callable
# Message data types
MessageData = Union[bytes, str, memoryview, Frame]
MultipartMessage = List[MessageData]
# Socket option value types
OptionValue = Union[int, bytes, str]
# Address types
Address = str
# Context manager type
SocketContext = Any # Context manager for bind/connect operations
# Serialization function types
Serializer = Callable[[Any], bytes]
Deserializer = Callable[[bytes], Any]Install with Tessl CLI
npx tessl i tessl/pypi-pyzmq