CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyzmq

Python bindings for ZeroMQ (ØMQ), a lightweight and fast messaging library

Pending
Overview
Eval results
Files

polling.mddocs/

Polling

High-performance event polling for monitoring multiple sockets simultaneously, with support for timeouts and different polling backends.

Capabilities

Poller Class

The Poller class provides efficient event monitoring for multiple sockets using platform-specific polling mechanisms.

class Poller:
    def __init__(self) -> None:
        """Create a new Poller instance."""

    def register(self, socket: Union[Socket, int], flags: int = POLLIN | POLLOUT) -> None:
        """
        Register a socket for polling.
        
        Parameters:
        - socket: Socket or file descriptor to monitor
        - flags: Event flags to monitor (POLLIN, POLLOUT, POLLERR)
        """

    def modify(self, socket: Union[Socket, int], flags: int) -> None:
        """
        Modify polling flags for a registered socket.
        
        Parameters:
        - socket: Socket or file descriptor
        - flags: New event flags
        """

    def unregister(self, socket: Union[Socket, int]) -> None:
        """
        Unregister a socket from polling.
        
        Parameters:
        - socket: Socket or file descriptor to unregister
        """

    def poll(self, timeout: int = -1) -> list[tuple[Socket, int]]:
        """
        Poll for events on registered sockets.
        
        Parameters:
        - timeout: Timeout in milliseconds (-1 for infinite, 0 for non-blocking)
        
        Returns:
        - list: List of (socket, events) tuples for sockets with events
        """

Low-Level Polling

Direct access to ZMQ's polling functionality for maximum control.

def zmq_poll(sockets: list[tuple[Socket, int]], timeout: int = -1) -> list[tuple[Socket, int]]:
    """
    Poll sockets for events.
    
    Parameters:
    - sockets: List of (socket, flags) tuples to poll
    - timeout: Timeout in milliseconds (-1 for infinite)
    
    Returns:
    - list: List of (socket, events) tuples for sockets with events
    """

Select-Style Interface

Python select-compatible interface for polling sockets.

def select(rlist: list[Socket], wlist: list[Socket], xlist: list[Socket], timeout: float = None) -> tuple[list[Socket], list[Socket], list[Socket]]:
    """
    Select-style polling interface.
    
    Parameters:
    - rlist: Sockets to check for readability
    - wlist: Sockets to check for writability  
    - xlist: Sockets to check for errors
    - timeout: Timeout in seconds (None for infinite)
    
    Returns:
    - tuple: (readable, writable, error) socket lists
    """

Usage Examples

Basic Polling

import zmq

context = zmq.Context()

# Create multiple sockets
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5555")

backend = context.socket(zmq.DEALER)  
backend.bind("tcp://*:5556")

# Create poller and register sockets
poller = zmq.Poller()
poller.register(frontend, zmq.POLLIN)
poller.register(backend, zmq.POLLIN)

try:
    while True:
        # Poll for events with 1 second timeout
        events = poller.poll(1000)
        
        if not events:
            print("No events in 1 second")
            continue
            
        for socket, event in events:
            if socket is frontend and event & zmq.POLLIN:
                # Handle frontend message
                message = frontend.recv_multipart()
                print(f"Frontend received: {message}")
                backend.send_multipart(message)
                
            elif socket is backend and event & zmq.POLLIN:
                # Handle backend message  
                message = backend.recv_multipart()
                print(f"Backend received: {message}")
                frontend.send_multipart(message)

except KeyboardInterrupt:
    print("Interrupted")
finally:
    frontend.close()
    backend.close()
    context.term()

Proxy with Polling

import zmq

def create_proxy():
    context = zmq.Context()
    
    # Frontend for clients
    frontend = context.socket(zmq.ROUTER)
    frontend.bind("tcp://*:5559")
    
    # Backend for workers
    backend = context.socket(zmq.DEALER)
    backend.bind("tcp://*:5560")
    
    # Control socket for shutdown
    control = context.socket(zmq.SUB)
    control.connect("inproc://control")
    control.setsockopt(zmq.SUBSCRIBE, b"")
    
    # Poll all sockets
    poller = zmq.Poller()
    poller.register(frontend, zmq.POLLIN)
    poller.register(backend, zmq.POLLIN)
    poller.register(control, zmq.POLLIN)
    
    try:
        while True:
            events = poller.poll()
            
            for socket, event in events:
                if socket is control and event & zmq.POLLIN:
                    # Shutdown signal
                    message = control.recv()
                    if message == b"TERMINATE":
                        return
                        
                elif socket is frontend and event & zmq.POLLIN:
                    # Route frontend to backend
                    message = frontend.recv_multipart()
                    backend.send_multipart(message)
                    
                elif socket is backend and event & zmq.POLLIN:
                    # Route backend to frontend
                    message = backend.recv_multipart()
                    frontend.send_multipart(message)
    finally:
        frontend.close()
        backend.close()
        control.close()
        context.term()

create_proxy()

Non-Blocking Polling

import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
socket.setsockopt(zmq.SUBSCRIBE, b"")

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

try:
    while True:
        # Non-blocking poll (timeout = 0)
        events = poller.poll(0)
        
        if events:
            # Process available messages
            for sock, event in events:
                if event & zmq.POLLIN:
                    message = sock.recv_string()
                    print(f"Received: {message}")
        else:
            # No messages available, do other work
            print("No messages, doing other work...")
            time.sleep(0.1)
            
except KeyboardInterrupt:
    print("Interrupted")
finally:
    socket.close()
    context.term()

Mixed Socket Types

import zmq
import socket as py_socket

context = zmq.Context()

# ZMQ socket
zmq_socket = context.socket(zmq.SUB)
zmq_socket.connect("tcp://localhost:5556")
zmq_socket.setsockopt(zmq.SUBSCRIBE, b"")

# Regular Python socket
tcp_socket = py_socket.socket(py_socket.AF_INET, py_socket.SOCK_STREAM)
tcp_socket.bind(("localhost", 8080))
tcp_socket.listen(5)
tcp_socket.setblocking(False)

# Poll both socket types
poller = zmq.Poller()
poller.register(zmq_socket, zmq.POLLIN)
poller.register(tcp_socket, zmq.POLLIN)  # Can register Python sockets too

try:
    while True:
        events = poller.poll(1000)
        
        for sock, event in events:
            if sock is zmq_socket and event & zmq.POLLIN:
                # Handle ZMQ message
                message = zmq_socket.recv_string()
                print(f"ZMQ message: {message}")
                
            elif sock is tcp_socket and event & zmq.POLLIN:
                # Handle TCP connection
                try:
                    client_sock, addr = tcp_socket.accept()
                    print(f"TCP connection from {addr}")
                    client_sock.close()
                except BlockingIOError:
                    pass

except KeyboardInterrupt:
    print("Interrupted")
finally:
    zmq_socket.close()
    tcp_socket.close()
    context.term()

Select-Style Polling

import zmq

context = zmq.Context()

# Create sockets
pub = context.socket(zmq.PUB)
pub.bind("tcp://*:5557")

sub1 = context.socket(zmq.SUB) 
sub1.connect("tcp://localhost:5557")
sub1.setsockopt(zmq.SUBSCRIBE, b"")

sub2 = context.socket(zmq.SUB)
sub2.connect("tcp://localhost:5557")  
sub2.setsockopt(zmq.SUBSCRIBE, b"")

# Send some messages
pub.send_string("Hello 1")
pub.send_string("Hello 2")

# Use select-style polling
readable, writable, error = zmq.select([sub1, sub2], [pub], [], timeout=1.0)

print(f"Readable sockets: {len(readable)}")
print(f"Writable sockets: {len(writable)}")
print(f"Error sockets: {len(error)}")

# Process readable sockets
for sock in readable:
    message = sock.recv_string()
    print(f"Received: {message}")

# Clean up
pub.close()
sub1.close()
sub2.close()
context.term()

Timeout Handling

import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

# Send request
socket.send_string("Hello Server")
start_time = time.time()

try:
    # Poll with timeout
    events = poller.poll(5000)  # 5 second timeout
    
    if events:
        # Response received
        response = socket.recv_string()
        elapsed = time.time() - start_time
        print(f"Response: {response} (took {elapsed:.2f}s)")
    else:
        # Timeout occurred
        print("Request timed out after 5 seconds")
        
        # Socket is now in inconsistent state, need to recreate
        socket.close()
        socket = context.socket(zmq.REQ)
        socket.connect("tcp://localhost:5555")
        
except KeyboardInterrupt:
    print("Interrupted")
finally:
    socket.close()
    context.term()

Event Flag Combinations

import zmq

context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.connect("tcp://localhost:5555")

poller = zmq.Poller()

# Register for different event combinations
poller.register(socket, zmq.POLLIN)          # Read only
# poller.register(socket, zmq.POLLOUT)       # Write only  
# poller.register(socket, zmq.POLLIN | zmq.POLLOUT)  # Read and write
# poller.register(socket, zmq.POLLERR)       # Error only

try:
    while True:
        events = poller.poll(1000)
        
        for sock, event in events:
            if event & zmq.POLLIN:
                print("Socket is readable")
                message = sock.recv_string()
                print(f"Received: {message}")
                
            if event & zmq.POLLOUT:
                print("Socket is writable")
                sock.send_string("Response")
                
            if event & zmq.POLLERR:
                print("Socket has error")
                break

except KeyboardInterrupt:
    print("Interrupted")
finally:
    socket.close()
    context.term()

Dynamic Socket Registration

import zmq
import random

context = zmq.Context()
poller = zmq.Poller()

# Track active sockets
sockets = {}

def add_socket(name):
    """Add a new socket to polling"""
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:5556")
    socket.setsockopt(zmq.SUBSCRIBE, name.encode())
    
    sockets[name] = socket
    poller.register(socket, zmq.POLLIN)
    print(f"Added socket: {name}")

def remove_socket(name):
    """Remove socket from polling"""
    if name in sockets:
        socket = sockets[name]
        poller.unregister(socket)
        socket.close()
        del sockets[name]
        print(f"Removed socket: {name}")

try:
    # Start with some sockets
    add_socket("weather")
    add_socket("news")
    
    while True:
        events = poller.poll(1000)
        
        if events:
            for socket, event in events:
                if event & zmq.POLLIN:
                    message = socket.recv_string()
                    print(f"Received: {message}")
        else:
            # Randomly add/remove sockets for demonstration
            action = random.choice(["add", "remove", "nothing"])
            
            if action == "add" and len(sockets) < 5:
                name = f"topic_{random.randint(1, 100)}"
                if name not in sockets:
                    add_socket(name)
                    
            elif action == "remove" and len(sockets) > 1:
                name = random.choice(list(sockets.keys()))
                remove_socket(name)

except KeyboardInterrupt:
    print("Interrupted")
finally:
    # Clean up all sockets
    for socket in sockets.values():
        socket.close()
    context.term()

Performance Tips

Efficient Polling Patterns

import zmq

# Reuse poller instance
poller = zmq.Poller()

# Register sockets once
for socket in sockets:
    poller.register(socket, zmq.POLLIN)

# Poll in tight loop
while True:
    # Use appropriate timeout
    events = poller.poll(100)  # Short timeout for responsiveness
    
    if not events:
        continue
        
    # Process events efficiently
    for socket, event in events:
        # Handle events without blocking
        if event & zmq.POLLIN:
            message = socket.recv(zmq.NOBLOCK)

Memory Management

import zmq

# Unregister sockets before closing
poller.unregister(socket)
socket.close()

# Reuse poller instances when possible
# Creating new pollers frequently can be expensive

Types

from typing import Union, List, Tuple, Optional

# Socket types for polling
PollSocket = Union[Socket, int]  # Socket or file descriptor

# Event flags
EventFlags = int  # Combination of POLLIN, POLLOUT, POLLERR

# Poll results
PollEvent = Tuple[Socket, int]  # (socket, events)
PollResult = List[PollEvent]

# Select-style results  
SelectResult = Tuple[List[Socket], List[Socket], List[Socket]]  # (readable, writable, error)

# Timeout types
PollTimeout = int      # Milliseconds (-1 for infinite)
SelectTimeout = Optional[float]  # Seconds (None for infinite)

Install with Tessl CLI

npx tessl i tessl/pypi-pyzmq

docs

async-support.md

authentication.md

constants.md

core-messaging.md

devices.md

error-handling.md

index.md

message-handling.md

polling.md

tile.json