CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyzmq

Python bindings for ZeroMQ (ØMQ), a lightweight and fast messaging library

Pending
Overview
Eval results
Files

devices.mddocs/

Devices

Background devices for message routing including proxy, queue, forwarder, and steerable proxy implementations with monitoring capabilities.

Capabilities

Proxy Functions

Built-in proxy functions for message routing between sockets.

def proxy(frontend: Socket, backend: Socket, capture: Socket = None) -> None:
    """
    Simple proxy connecting frontend and backend sockets.
    
    Parameters:
    - frontend: Frontend socket (e.g., ROUTER for clients)
    - backend: Backend socket (e.g., DEALER for workers)  
    - capture: Optional socket to capture all messages
    """

def proxy_steerable(frontend: Socket, backend: Socket, capture: Socket = None, control: Socket = None) -> None:
    """
    Steerable proxy with control socket for runtime management.
    
    Parameters:
    - frontend: Frontend socket
    - backend: Backend socket
    - capture: Optional capture socket
    - control: Control socket for PAUSE/RESUME/TERMINATE commands
    """

Device Base Classes

Base classes for creating custom background devices.

class Device:
    def __init__(self, device_type: int, frontend: Socket, backend: Socket) -> None:
        """
        Base device class.
        
        Parameters:
        - device_type: Device type constant (QUEUE, FORWARDER, STREAMER)
        - frontend: Frontend socket
        - backend: Backend socket
        """

    def run(self) -> None:
        """Run the device (blocking)."""

class ThreadDevice(Device):
    def __init__(self, device_type: int, frontend: Socket, backend: Socket) -> None:
        """Thread-based device that runs in background."""

    def start(self) -> None:
        """Start the device in a background thread."""

    def join(self, timeout: float = None) -> None:
        """
        Wait for device thread to terminate.
        
        Parameters:
        - timeout: Timeout in seconds (None for infinite)
        """

    @property
    def done(self) -> bool:
        """True if device has finished running."""

Monitored Queue Device

Queue device with monitoring capabilities.

class MonitoredQueue:
    def __init__(self, in_socket: Socket, out_socket: Socket, mon_socket: Socket) -> None:
        """
        Create a monitored queue device.
        
        Parameters:
        - in_socket: Input socket
        - out_socket: Output socket  
        - mon_socket: Monitoring socket
        """

    def run(self) -> None:
        """Run the monitored queue (blocking)."""

class MonitoredQueueDevice(ThreadDevice):
    def __init__(self, in_socket: Socket, out_socket: Socket, mon_socket: Socket) -> None:
        """Thread-based monitored queue device."""

Proxy Device Classes

High-level proxy device implementations.

class ProxyDevice(ThreadDevice):
    def __init__(self, frontend: Socket, backend: Socket, capture: Socket = None) -> None:
        """
        Proxy device running in background thread.
        
        Parameters:
        - frontend: Frontend socket
        - backend: Backend socket
        - capture: Optional capture socket
        """

class ProxySteerableDevice(ThreadDevice):  
    def __init__(self, frontend: Socket, backend: Socket, capture: Socket = None, control: Socket = None) -> None:
        """
        Steerable proxy device with control interface.
        
        Parameters:
        - frontend: Frontend socket
        - backend: Backend socket
        - capture: Optional capture socket
        - control: Control socket for management
        """

    def pause(self) -> None:
        """Pause message forwarding."""

    def resume(self) -> None:
        """Resume message forwarding."""

    def terminate(self) -> None:
        """Terminate the proxy device."""

Usage Examples

Basic Proxy

import zmq

context = zmq.Context()

# Create frontend for clients
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")

# Create backend for workers  
backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:5560")

try:
    # Run proxy (blocking)
    zmq.proxy(frontend, backend)
except KeyboardInterrupt:
    print("Proxy interrupted")
finally:
    frontend.close()
    backend.close()  
    context.term()

Proxy with Message Capture

import zmq

context = zmq.Context()

# Create sockets
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")

backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:5560")

# Capture socket for monitoring
capture = context.socket(zmq.PUB)
capture.bind("tcp://*:5561")

try:
    # All messages will be captured and published
    zmq.proxy(frontend, backend, capture)
except KeyboardInterrupt:
    print("Proxy with capture interrupted")
finally:
    frontend.close()
    backend.close()
    capture.close()
    context.term()

Steerable Proxy

import zmq
import threading
import time

def control_proxy():
    """Control function for steerable proxy"""
    context = zmq.Context()
    control = context.socket(zmq.REQ)
    control.connect("inproc://control")
    
    time.sleep(2)
    
    # Pause proxy
    control.send_string("PAUSE")
    control.recv_string()
    print("Proxy paused")
    
    time.sleep(2)
    
    # Resume proxy
    control.send_string("RESUME") 
    control.recv_string()
    print("Proxy resumed")
    
    time.sleep(2)
    
    # Terminate proxy
    control.send_string("TERMINATE")
    control.recv_string()
    print("Proxy terminated")
    
    control.close()
    context.term()

def main():
    context = zmq.Context()
    
    # Create proxy sockets
    frontend = context.socket(zmq.ROUTER)
    frontend.bind("tcp://*:5559")
    
    backend = context.socket(zmq.DEALER)
    backend.bind("tcp://*:5560")
    
    # Control socket
    control = context.socket(zmq.REP)
    control.bind("inproc://control")
    
    # Start control thread
    control_thread = threading.Thread(target=control_proxy)
    control_thread.start()
    
    try:
        # Run steerable proxy
        zmq.proxy_steerable(frontend, backend, None, control)
    finally:
        control_thread.join()
        frontend.close()
        backend.close()
        control.close()
        context.term()

main()

Thread-based Proxy Device

import zmq
from zmq.devices import ProxyDevice
import time

context = zmq.Context()

# Create sockets
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")

backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:5560")

# Create proxy device
device = ProxyDevice(frontend, backend)

try:
    # Start proxy in background thread
    device.start()
    print("Proxy started in background")
    
    # Do other work while proxy runs
    for i in range(10):
        print(f"Main thread working... {i}")
        time.sleep(1)
        
    # Wait for device to complete (won't happen in this example)
    device.join(timeout=1.0)
    
except KeyboardInterrupt:
    print("Main interrupted")
finally:
    # Device will be cleaned up automatically
    frontend.close()
    backend.close()
    context.term()

Monitored Queue

import zmq
from zmq.devices import MonitoredQueueDevice

context = zmq.Context()

# Create queue sockets
input_socket = context.socket(zmq.PULL)
input_socket.bind("tcp://*:5557")

output_socket = context.socket(zmq.PUSH)
output_socket.bind("tcp://*:5558")

# Monitoring socket
monitor_socket = context.socket(zmq.PUB)
monitor_socket.bind("tcp://*:5559")

# Create monitored queue device
device = MonitoredQueueDevice(input_socket, output_socket, monitor_socket)

try:
    device.start()
    print("Monitored queue started")
    
    # Monitor the queue
    monitor_client = context.socket(zmq.SUB)
    monitor_client.connect("tcp://localhost:5559")
    monitor_client.setsockopt(zmq.SUBSCRIBE, b"")
    
    # Process monitoring messages
    while True:
        try:
            message = monitor_client.recv_string(zmq.NOBLOCK)
            print(f"Monitor: {message}")
        except zmq.Again:
            time.sleep(0.1)
            
except KeyboardInterrupt:
    print("Monitored queue interrupted")
finally:
    device.join()
    input_socket.close()
    output_socket.close()
    monitor_socket.close()
    monitor_client.close()
    context.term()

Custom Device

import zmq
from zmq.devices import Device
import json
import time

class LoggingDevice(Device):
    """Custom device that logs all messages"""
    
    def __init__(self, frontend, backend, log_file="messages.log"):
        super().__init__(zmq.QUEUE, frontend, backend)
        self.log_file = log_file
        
    def run(self):
        """Custom run method with logging"""
        poller = zmq.Poller()
        poller.register(self.frontend_socket, zmq.POLLIN)
        poller.register(self.backend_socket, zmq.POLLIN)
        
        with open(self.log_file, 'w') as log:
            while True:
                events = poller.poll()
                
                for socket, event in events:
                    if socket is self.frontend_socket and event & zmq.POLLIN:
                        # Forward frontend -> backend
                        message = self.frontend_socket.recv_multipart()
                        self.backend_socket.send_multipart(message)
                        
                        # Log message
                        log_entry = {
                            'timestamp': time.time(),
                            'direction': 'frontend->backend', 
                            'message': [part.decode('utf-8', errors='ignore') for part in message]
                        }
                        log.write(json.dumps(log_entry) + '\n')
                        log.flush()
                        
                    elif socket is self.backend_socket and event & zmq.POLLIN:
                        # Forward backend -> frontend
                        message = self.backend_socket.recv_multipart()
                        self.frontend_socket.send_multipart(message)
                        
                        # Log message
                        log_entry = {
                            'timestamp': time.time(),
                            'direction': 'backend->frontend',
                            'message': [part.decode('utf-8', errors='ignore') for part in message]
                        }
                        log.write(json.dumps(log_entry) + '\n')
                        log.flush()

# Usage
context = zmq.Context()

frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")

backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:5560")

device = LoggingDevice(frontend, backend, "proxy.log")

try:
    device.run()
except KeyboardInterrupt:
    print("Logging device interrupted")
finally:
    frontend.close()
    backend.close()
    context.term()

Load Balancer Device

import zmq
from zmq.devices import ThreadDevice
import random

class LoadBalancerDevice(ThreadDevice):
    """Load balancer that distributes work across multiple backends"""
    
    def __init__(self, frontend, backends):
        # Use first backend as representative 
        super().__init__(zmq.QUEUE, frontend, backends[0])
        self.backends = backends
        
    def run(self):
        """Custom load balancing logic"""
        poller = zmq.Poller()
        poller.register(self.frontend_socket, zmq.POLLIN)
        
        # Track available backends
        available_backends = list(self.backends)
        backend_poller = zmq.Poller()
        
        for backend in self.backends:
            backend_poller.register(backend, zmq.POLLIN)
        
        while True:
            # Check for frontend requests
            if poller.poll(10):  # 10ms timeout
                message = self.frontend_socket.recv_multipart()
                
                # Select available backend
                if available_backends:
                    backend = random.choice(available_backends)
                    backend.send_multipart(message)
                    print(f"Sent to backend {self.backends.index(backend)}")
            
            # Check for backend responses
            backend_events = backend_poller.poll(10)
            for backend, event in backend_events:
                if event & zmq.POLLIN:
                    response = backend.recv_multipart()
                    self.frontend_socket.send_multipart(response)
                    print(f"Response from backend {self.backends.index(backend)}")

# Usage
context = zmq.Context()

frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")

# Multiple backend sockets
backends = []
for i in range(3):
    backend = context.socket(zmq.DEALER)
    backend.bind(f"tcp://*:{5560 + i}")
    backends.append(backend)

device = LoadBalancerDevice(frontend, backends)

try:
    device.start()
    print("Load balancer started with 3 backends")
    
    # Keep main thread alive
    device.join()
    
except KeyboardInterrupt:
    print("Load balancer interrupted")
finally:
    frontend.close()
    for backend in backends:
        backend.close()
    context.term()

Device with Statistics

import zmq
from zmq.devices import Device
import time
import threading

class StatisticsDevice(Device):
    """Device that tracks message statistics"""
    
    def __init__(self, frontend, backend):
        super().__init__(zmq.QUEUE, frontend, backend)
        self.stats = {
            'messages_forwarded': 0,
            'bytes_forwarded': 0,
            'start_time': time.time(),
            'last_message_time': 0
        }
        self.stats_lock = threading.Lock()
        
    def get_stats(self):
        """Get current statistics"""
        with self.stats_lock:
            runtime = time.time() - self.stats['start_time']
            return {
                **self.stats,
                'runtime_seconds': runtime,
                'messages_per_second': self.stats['messages_forwarded'] / max(runtime, 1),
                'bytes_per_second': self.stats['bytes_forwarded'] / max(runtime, 1)
            }
    
    def run(self):
        """Run with statistics tracking"""
        poller = zmq.Poller()
        poller.register(self.frontend_socket, zmq.POLLIN)
        poller.register(self.backend_socket, zmq.POLLIN)
        
        while True:
            events = poller.poll()
            
            for socket, event in events:
                if event & zmq.POLLIN:
                    message = socket.recv_multipart()
                    
                    # Update statistics
                    with self.stats_lock:
                        self.stats['messages_forwarded'] += 1
                        self.stats['bytes_forwarded'] += sum(len(part) for part in message)
                        self.stats['last_message_time'] = time.time()
                    
                    # Forward message
                    if socket is self.frontend_socket:
                        self.backend_socket.send_multipart(message)
                    else:
                        self.frontend_socket.send_multipart(message)

# Usage with statistics reporting
def print_stats(device):
    """Print statistics periodically"""
    while True:
        time.sleep(5)
        stats = device.get_stats()
        print(f"Messages: {stats['messages_forwarded']}, "
              f"Rate: {stats['messages_per_second']:.1f} msg/s, "
              f"Bytes: {stats['bytes_forwarded']}")

context = zmq.Context()

frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")

backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:5560")

device = StatisticsDevice(frontend, backend)

# Start statistics reporter
stats_thread = threading.Thread(target=print_stats, args=(device,))
stats_thread.daemon = True
stats_thread.start()

try:
    device.run()
except KeyboardInterrupt:
    print("\nFinal statistics:")
    stats = device.get_stats()
    for key, value in stats.items():
        print(f"  {key}: {value}")
finally:
    frontend.close()
    backend.close()
    context.term()

Device Types

PyZMQ supports several predefined device types:

# Device type constants
zmq.QUEUE      # Load-balancing queue device
zmq.FORWARDER  # Message forwarder device  
zmq.STREAMER   # Message streamer device

Control Commands

Steerable proxy devices accept these control commands:

# Control commands for steerable proxy
"PAUSE"     # Pause message forwarding
"RESUME"    # Resume message forwarding  
"TERMINATE" # Terminate the proxy
"STATISTICS" # Get proxy statistics (if supported)

Types

from typing import Optional, Dict, Any, List
import threading

# Device types
DeviceType = int  # QUEUE, FORWARDER, STREAMER constants

# Socket types for devices
DeviceSocket = Socket
FrontendSocket = Socket
BackendSocket = Socket
CaptureSocket = Optional[Socket]
ControlSocket = Optional[Socket]
MonitorSocket = Socket

# Statistics types
DeviceStats = Dict[str, Any]
MessageCount = int
ByteCount = int
Timestamp = float

# Thread types
DeviceThread = threading.Thread
JoinTimeout = Optional[float]

# Control types
ControlCommand = str  # "PAUSE", "RESUME", "TERMINATE", etc.

Install with Tessl CLI

npx tessl i tessl/pypi-pyzmq

docs

async-support.md

authentication.md

constants.md

core-messaging.md

devices.md

error-handling.md

index.md

message-handling.md

polling.md

tile.json