Python bindings for ZeroMQ (ØMQ), a lightweight and fast messaging library
—
High-performance event polling for monitoring multiple sockets simultaneously, with support for timeouts and different polling backends.
The Poller class provides efficient event monitoring for multiple sockets using platform-specific polling mechanisms.
class Poller:
def __init__(self) -> None:
"""Create a new Poller instance."""
def register(self, socket: Union[Socket, int], flags: int = POLLIN | POLLOUT) -> None:
"""
Register a socket for polling.
Parameters:
- socket: Socket or file descriptor to monitor
- flags: Event flags to monitor (POLLIN, POLLOUT, POLLERR)
"""
def modify(self, socket: Union[Socket, int], flags: int) -> None:
"""
Modify polling flags for a registered socket.
Parameters:
- socket: Socket or file descriptor
- flags: New event flags
"""
def unregister(self, socket: Union[Socket, int]) -> None:
"""
Unregister a socket from polling.
Parameters:
- socket: Socket or file descriptor to unregister
"""
def poll(self, timeout: int = -1) -> list[tuple[Socket, int]]:
"""
Poll for events on registered sockets.
Parameters:
- timeout: Timeout in milliseconds (-1 for infinite, 0 for non-blocking)
Returns:
- list: List of (socket, events) tuples for sockets with events
"""Direct access to ZMQ's polling functionality for maximum control.
def zmq_poll(sockets: list[tuple[Socket, int]], timeout: int = -1) -> list[tuple[Socket, int]]:
"""
Poll sockets for events.
Parameters:
- sockets: List of (socket, flags) tuples to poll
- timeout: Timeout in milliseconds (-1 for infinite)
Returns:
- list: List of (socket, events) tuples for sockets with events
"""Python select-compatible interface for polling sockets.
def select(rlist: list[Socket], wlist: list[Socket], xlist: list[Socket], timeout: float = None) -> tuple[list[Socket], list[Socket], list[Socket]]:
"""
Select-style polling interface.
Parameters:
- rlist: Sockets to check for readability
- wlist: Sockets to check for writability
- xlist: Sockets to check for errors
- timeout: Timeout in seconds (None for infinite)
Returns:
- tuple: (readable, writable, error) socket lists
"""import zmq
context = zmq.Context()
# Create multiple sockets
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5555")
backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:5556")
# Create poller and register sockets
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)
try:
while True:
# Poll for events with 1 second timeout
events = poller.poll(1000)
if not events:
print("No events in 1 second")
continue
for socket, event in events:
if socket is frontend and event & zmq.POLLIN:
# Handle frontend message
message = frontend.recv_multipart()
print(f"Frontend received: {message}")
backend.send_multipart(message)
elif socket is backend and event & zmq.POLLIN:
# Handle backend message
message = backend.recv_multipart()
print(f"Backend received: {message}")
frontend.send_multipart(message)
except KeyboardInterrupt:
print("Interrupted")
finally:
frontend.close()
backend.close()
context.term()import zmq
def create_proxy():
context = zmq.Context()
# Frontend for clients
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
# Backend for workers
backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:5560")
# Control socket for shutdown
control = context.socket(zmq.SUB)
control.connect("inproc://control")
control.setsockopt(zmq.SUBSCRIBE, b"")
# Poll all sockets
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)
poller.register(control, zmq.POLLIN)
try:
while True:
events = poller.poll()
for socket, event in events:
if socket is control and event & zmq.POLLIN:
# Shutdown signal
message = control.recv()
if message == b"TERMINATE":
return
elif socket is frontend and event & zmq.POLLIN:
# Route frontend to backend
message = frontend.recv_multipart()
backend.send_multipart(message)
elif socket is backend and event & zmq.POLLIN:
# Route backend to frontend
message = backend.recv_multipart()
frontend.send_multipart(message)
finally:
frontend.close()
backend.close()
control.close()
context.term()
create_proxy()import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
socket.setsockopt(zmq.SUBSCRIBE, b"")
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
try:
while True:
# Non-blocking poll (timeout = 0)
events = poller.poll(0)
if events:
# Process available messages
for sock, event in events:
if event & zmq.POLLIN:
message = sock.recv_string()
print(f"Received: {message}")
else:
# No messages available, do other work
print("No messages, doing other work...")
time.sleep(0.1)
except KeyboardInterrupt:
print("Interrupted")
finally:
socket.close()
context.term()import zmq
import socket as py_socket
context = zmq.Context()
# ZMQ socket
zmq_socket = context.socket(zmq.SUB)
zmq_socket.connect("tcp://localhost:5556")
zmq_socket.setsockopt(zmq.SUBSCRIBE, b"")
# Regular Python socket
tcp_socket = py_socket.socket(py_socket.AF_INET, py_socket.SOCK_STREAM)
tcp_socket.bind(("localhost", 8080))
tcp_socket.listen(5)
tcp_socket.setblocking(False)
# Poll both socket types
poller = zmq.Poller()
poller.register(zmq_socket, zmq.POLLIN)
poller.register(tcp_socket, zmq.POLLIN) # Can register Python sockets too
try:
while True:
events = poller.poll(1000)
for sock, event in events:
if sock is zmq_socket and event & zmq.POLLIN:
# Handle ZMQ message
message = zmq_socket.recv_string()
print(f"ZMQ message: {message}")
elif sock is tcp_socket and event & zmq.POLLIN:
# Handle TCP connection
try:
client_sock, addr = tcp_socket.accept()
print(f"TCP connection from {addr}")
client_sock.close()
except BlockingIOError:
pass
except KeyboardInterrupt:
print("Interrupted")
finally:
zmq_socket.close()
tcp_socket.close()
context.term()import zmq
context = zmq.Context()
# Create sockets
pub = context.socket(zmq.PUB)
pub.bind("tcp://*:5557")
sub1 = context.socket(zmq.SUB)
sub1.connect("tcp://localhost:5557")
sub1.setsockopt(zmq.SUBSCRIBE, b"")
sub2 = context.socket(zmq.SUB)
sub2.connect("tcp://localhost:5557")
sub2.setsockopt(zmq.SUBSCRIBE, b"")
# Send some messages
pub.send_string("Hello 1")
pub.send_string("Hello 2")
# Use select-style polling
readable, writable, error = zmq.select([sub1, sub2], [pub], [], timeout=1.0)
print(f"Readable sockets: {len(readable)}")
print(f"Writable sockets: {len(writable)}")
print(f"Error sockets: {len(error)}")
# Process readable sockets
for sock in readable:
message = sock.recv_string()
print(f"Received: {message}")
# Clean up
pub.close()
sub1.close()
sub2.close()
context.term()import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
# Send request
socket.send_string("Hello Server")
start_time = time.time()
try:
# Poll with timeout
events = poller.poll(5000) # 5 second timeout
if events:
# Response received
response = socket.recv_string()
elapsed = time.time() - start_time
print(f"Response: {response} (took {elapsed:.2f}s)")
else:
# Timeout occurred
print("Request timed out after 5 seconds")
# Socket is now in inconsistent state, need to recreate
socket.close()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
except KeyboardInterrupt:
print("Interrupted")
finally:
socket.close()
context.term()import zmq
context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.connect("tcp://localhost:5555")
poller = zmq.Poller()
# Register for different event combinations
poller.register(socket, zmq.POLLIN) # Read only
# poller.register(socket, zmq.POLLOUT) # Write only
# poller.register(socket, zmq.POLLIN | zmq.POLLOUT) # Read and write
# poller.register(socket, zmq.POLLERR) # Error only
try:
while True:
events = poller.poll(1000)
for sock, event in events:
if event & zmq.POLLIN:
print("Socket is readable")
message = sock.recv_string()
print(f"Received: {message}")
if event & zmq.POLLOUT:
print("Socket is writable")
sock.send_string("Response")
if event & zmq.POLLERR:
print("Socket has error")
break
except KeyboardInterrupt:
print("Interrupted")
finally:
socket.close()
context.term()import zmq
import random
context = zmq.Context()
poller = zmq.Poller()
# Track active sockets
sockets = {}
def add_socket(name):
"""Add a new socket to polling"""
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
socket.setsockopt(zmq.SUBSCRIBE, name.encode())
sockets[name] = socket
poller.register(socket, zmq.POLLIN)
print(f"Added socket: {name}")
def remove_socket(name):
"""Remove socket from polling"""
if name in sockets:
socket = sockets[name]
poller.unregister(socket)
socket.close()
del sockets[name]
print(f"Removed socket: {name}")
try:
# Start with some sockets
add_socket("weather")
add_socket("news")
while True:
events = poller.poll(1000)
if events:
for socket, event in events:
if event & zmq.POLLIN:
message = socket.recv_string()
print(f"Received: {message}")
else:
# Randomly add/remove sockets for demonstration
action = random.choice(["add", "remove", "nothing"])
if action == "add" and len(sockets) < 5:
name = f"topic_{random.randint(1, 100)}"
if name not in sockets:
add_socket(name)
elif action == "remove" and len(sockets) > 1:
name = random.choice(list(sockets.keys()))
remove_socket(name)
except KeyboardInterrupt:
print("Interrupted")
finally:
# Clean up all sockets
for socket in sockets.values():
socket.close()
context.term()import zmq
# Reuse poller instance
poller = zmq.Poller()
# Register sockets once
for socket in sockets:
poller.register(socket, zmq.POLLIN)
# Poll in tight loop
while True:
# Use appropriate timeout
events = poller.poll(100) # Short timeout for responsiveness
if not events:
continue
# Process events efficiently
for socket, event in events:
# Handle events without blocking
if event & zmq.POLLIN:
message = socket.recv(zmq.NOBLOCK)import zmq
# Unregister sockets before closing
poller.unregister(socket)
socket.close()
# Reuse poller instances when possible
# Creating new pollers frequently can be expensivefrom typing import Union, List, Tuple, Optional
# Socket types for polling
PollSocket = Union[Socket, int] # Socket or file descriptor
# Event flags
EventFlags = int # Combination of POLLIN, POLLOUT, POLLERR
# Poll results
PollEvent = Tuple[Socket, int] # (socket, events)
PollResult = List[PollEvent]
# Select-style results
SelectResult = Tuple[List[Socket], List[Socket], List[Socket]] # (readable, writable, error)
# Timeout types
PollTimeout = int # Milliseconds (-1 for infinite)
SelectTimeout = Optional[float] # Seconds (None for infinite)Install with Tessl CLI
npx tessl i tessl/pypi-pyzmq