Python bindings for ZeroMQ (ØMQ), a lightweight and fast messaging library
—
Background devices for message routing including proxy, queue, forwarder, and steerable proxy implementations with monitoring capabilities.
Built-in proxy functions for message routing between sockets.
def proxy(frontend: Socket, backend: Socket, capture: Socket = None) -> None:
"""
Simple proxy connecting frontend and backend sockets.
Parameters:
- frontend: Frontend socket (e.g., ROUTER for clients)
- backend: Backend socket (e.g., DEALER for workers)
- capture: Optional socket to capture all messages
"""
def proxy_steerable(frontend: Socket, backend: Socket, capture: Socket = None, control: Socket = None) -> None:
"""
Steerable proxy with control socket for runtime management.
Parameters:
- frontend: Frontend socket
- backend: Backend socket
- capture: Optional capture socket
- control: Control socket for PAUSE/RESUME/TERMINATE commands
"""Base classes for creating custom background devices.
class Device:
def __init__(self, device_type: int, frontend: Socket, backend: Socket) -> None:
"""
Base device class.
Parameters:
- device_type: Device type constant (QUEUE, FORWARDER, STREAMER)
- frontend: Frontend socket
- backend: Backend socket
"""
def run(self) -> None:
"""Run the device (blocking)."""
class ThreadDevice(Device):
def __init__(self, device_type: int, frontend: Socket, backend: Socket) -> None:
"""Thread-based device that runs in background."""
def start(self) -> None:
"""Start the device in a background thread."""
def join(self, timeout: float = None) -> None:
"""
Wait for device thread to terminate.
Parameters:
- timeout: Timeout in seconds (None for infinite)
"""
@property
def done(self) -> bool:
"""True if device has finished running."""Queue device with monitoring capabilities.
class MonitoredQueue:
def __init__(self, in_socket: Socket, out_socket: Socket, mon_socket: Socket) -> None:
"""
Create a monitored queue device.
Parameters:
- in_socket: Input socket
- out_socket: Output socket
- mon_socket: Monitoring socket
"""
def run(self) -> None:
"""Run the monitored queue (blocking)."""
class MonitoredQueueDevice(ThreadDevice):
def __init__(self, in_socket: Socket, out_socket: Socket, mon_socket: Socket) -> None:
"""Thread-based monitored queue device."""High-level proxy device implementations.
class ProxyDevice(ThreadDevice):
def __init__(self, frontend: Socket, backend: Socket, capture: Socket = None) -> None:
"""
Proxy device running in background thread.
Parameters:
- frontend: Frontend socket
- backend: Backend socket
- capture: Optional capture socket
"""
class ProxySteerableDevice(ThreadDevice):
def __init__(self, frontend: Socket, backend: Socket, capture: Socket = None, control: Socket = None) -> None:
"""
Steerable proxy device with control interface.
Parameters:
- frontend: Frontend socket
- backend: Backend socket
- capture: Optional capture socket
- control: Control socket for management
"""
def pause(self) -> None:
"""Pause message forwarding."""
def resume(self) -> None:
"""Resume message forwarding."""
def terminate(self) -> None:
"""Terminate the proxy device."""import zmq
context = zmq.Context()
# Create frontend for clients
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
# Create backend for workers
backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:5560")
try:
# Run proxy (blocking)
zmq.proxy(frontend, backend)
except KeyboardInterrupt:
print("Proxy interrupted")
finally:
frontend.close()
backend.close()
context.term()import zmq
context = zmq.Context()
# Create sockets
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:5560")
# Capture socket for monitoring
capture = context.socket(zmq.PUB)
capture.bind("tcp://*:5561")
try:
# All messages will be captured and published
zmq.proxy(frontend, backend, capture)
except KeyboardInterrupt:
print("Proxy with capture interrupted")
finally:
frontend.close()
backend.close()
capture.close()
context.term()import zmq
import threading
import time
def control_proxy():
"""Control function for steerable proxy"""
context = zmq.Context()
control = context.socket(zmq.REQ)
control.connect("inproc://control")
time.sleep(2)
# Pause proxy
control.send_string("PAUSE")
control.recv_string()
print("Proxy paused")
time.sleep(2)
# Resume proxy
control.send_string("RESUME")
control.recv_string()
print("Proxy resumed")
time.sleep(2)
# Terminate proxy
control.send_string("TERMINATE")
control.recv_string()
print("Proxy terminated")
control.close()
context.term()
def main():
context = zmq.Context()
# Create proxy sockets
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:5560")
# Control socket
control = context.socket(zmq.REP)
control.bind("inproc://control")
# Start control thread
control_thread = threading.Thread(target=control_proxy)
control_thread.start()
try:
# Run steerable proxy
zmq.proxy_steerable(frontend, backend, None, control)
finally:
control_thread.join()
frontend.close()
backend.close()
control.close()
context.term()
main()import zmq
from zmq.devices import ProxyDevice
import time
context = zmq.Context()
# Create sockets
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:5560")
# Create proxy device
device = ProxyDevice(frontend, backend)
try:
# Start proxy in background thread
device.start()
print("Proxy started in background")
# Do other work while proxy runs
for i in range(10):
print(f"Main thread working... {i}")
time.sleep(1)
# Wait for device to complete (won't happen in this example)
device.join(timeout=1.0)
except KeyboardInterrupt:
print("Main interrupted")
finally:
# Device will be cleaned up automatically
frontend.close()
backend.close()
context.term()import zmq
from zmq.devices import MonitoredQueueDevice
context = zmq.Context()
# Create queue sockets
input_socket = context.socket(zmq.PULL)
input_socket.bind("tcp://*:5557")
output_socket = context.socket(zmq.PUSH)
output_socket.bind("tcp://*:5558")
# Monitoring socket
monitor_socket = context.socket(zmq.PUB)
monitor_socket.bind("tcp://*:5559")
# Create monitored queue device
device = MonitoredQueueDevice(input_socket, output_socket, monitor_socket)
try:
device.start()
print("Monitored queue started")
# Monitor the queue
monitor_client = context.socket(zmq.SUB)
monitor_client.connect("tcp://localhost:5559")
monitor_client.setsockopt(zmq.SUBSCRIBE, b"")
# Process monitoring messages
while True:
try:
message = monitor_client.recv_string(zmq.NOBLOCK)
print(f"Monitor: {message}")
except zmq.Again:
time.sleep(0.1)
except KeyboardInterrupt:
print("Monitored queue interrupted")
finally:
device.join()
input_socket.close()
output_socket.close()
monitor_socket.close()
monitor_client.close()
context.term()import zmq
from zmq.devices import Device
import json
import time
class LoggingDevice(Device):
"""Custom device that logs all messages"""
def __init__(self, frontend, backend, log_file="messages.log"):
super().__init__(zmq.QUEUE, frontend, backend)
self.log_file = log_file
def run(self):
"""Custom run method with logging"""
poller = zmq.Poller()
poller.register(self.frontend_socket, zmq.POLLIN)
poller.register(self.backend_socket, zmq.POLLIN)
with open(self.log_file, 'w') as log:
while True:
events = poller.poll()
for socket, event in events:
if socket is self.frontend_socket and event & zmq.POLLIN:
# Forward frontend -> backend
message = self.frontend_socket.recv_multipart()
self.backend_socket.send_multipart(message)
# Log message
log_entry = {
'timestamp': time.time(),
'direction': 'frontend->backend',
'message': [part.decode('utf-8', errors='ignore') for part in message]
}
log.write(json.dumps(log_entry) + '\n')
log.flush()
elif socket is self.backend_socket and event & zmq.POLLIN:
# Forward backend -> frontend
message = self.backend_socket.recv_multipart()
self.frontend_socket.send_multipart(message)
# Log message
log_entry = {
'timestamp': time.time(),
'direction': 'backend->frontend',
'message': [part.decode('utf-8', errors='ignore') for part in message]
}
log.write(json.dumps(log_entry) + '\n')
log.flush()
# Usage
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:5560")
device = LoggingDevice(frontend, backend, "proxy.log")
try:
device.run()
except KeyboardInterrupt:
print("Logging device interrupted")
finally:
frontend.close()
backend.close()
context.term()import zmq
from zmq.devices import ThreadDevice
import random
class LoadBalancerDevice(ThreadDevice):
"""Load balancer that distributes work across multiple backends"""
def __init__(self, frontend, backends):
# Use first backend as representative
super().__init__(zmq.QUEUE, frontend, backends[0])
self.backends = backends
def run(self):
"""Custom load balancing logic"""
poller = zmq.Poller()
poller.register(self.frontend_socket, zmq.POLLIN)
# Track available backends
available_backends = list(self.backends)
backend_poller = zmq.Poller()
for backend in self.backends:
backend_poller.register(backend, zmq.POLLIN)
while True:
# Check for frontend requests
if poller.poll(10): # 10ms timeout
message = self.frontend_socket.recv_multipart()
# Select available backend
if available_backends:
backend = random.choice(available_backends)
backend.send_multipart(message)
print(f"Sent to backend {self.backends.index(backend)}")
# Check for backend responses
backend_events = backend_poller.poll(10)
for backend, event in backend_events:
if event & zmq.POLLIN:
response = backend.recv_multipart()
self.frontend_socket.send_multipart(response)
print(f"Response from backend {self.backends.index(backend)}")
# Usage
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
# Multiple backend sockets
backends = []
for i in range(3):
backend = context.socket(zmq.DEALER)
backend.bind(f"tcp://*:{5560 + i}")
backends.append(backend)
device = LoadBalancerDevice(frontend, backends)
try:
device.start()
print("Load balancer started with 3 backends")
# Keep main thread alive
device.join()
except KeyboardInterrupt:
print("Load balancer interrupted")
finally:
frontend.close()
for backend in backends:
backend.close()
context.term()import zmq
from zmq.devices import Device
import time
import threading
class StatisticsDevice(Device):
"""Device that tracks message statistics"""
def __init__(self, frontend, backend):
super().__init__(zmq.QUEUE, frontend, backend)
self.stats = {
'messages_forwarded': 0,
'bytes_forwarded': 0,
'start_time': time.time(),
'last_message_time': 0
}
self.stats_lock = threading.Lock()
def get_stats(self):
"""Get current statistics"""
with self.stats_lock:
runtime = time.time() - self.stats['start_time']
return {
**self.stats,
'runtime_seconds': runtime,
'messages_per_second': self.stats['messages_forwarded'] / max(runtime, 1),
'bytes_per_second': self.stats['bytes_forwarded'] / max(runtime, 1)
}
def run(self):
"""Run with statistics tracking"""
poller = zmq.Poller()
poller.register(self.frontend_socket, zmq.POLLIN)
poller.register(self.backend_socket, zmq.POLLIN)
while True:
events = poller.poll()
for socket, event in events:
if event & zmq.POLLIN:
message = socket.recv_multipart()
# Update statistics
with self.stats_lock:
self.stats['messages_forwarded'] += 1
self.stats['bytes_forwarded'] += sum(len(part) for part in message)
self.stats['last_message_time'] = time.time()
# Forward message
if socket is self.frontend_socket:
self.backend_socket.send_multipart(message)
else:
self.frontend_socket.send_multipart(message)
# Usage with statistics reporting
def print_stats(device):
"""Print statistics periodically"""
while True:
time.sleep(5)
stats = device.get_stats()
print(f"Messages: {stats['messages_forwarded']}, "
f"Rate: {stats['messages_per_second']:.1f} msg/s, "
f"Bytes: {stats['bytes_forwarded']}")
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5559")
backend = context.socket(zmq.DEALER)
backend.bind("tcp://*:5560")
device = StatisticsDevice(frontend, backend)
# Start statistics reporter
stats_thread = threading.Thread(target=print_stats, args=(device,))
stats_thread.daemon = True
stats_thread.start()
try:
device.run()
except KeyboardInterrupt:
print("\nFinal statistics:")
stats = device.get_stats()
for key, value in stats.items():
print(f" {key}: {value}")
finally:
frontend.close()
backend.close()
context.term()PyZMQ supports several predefined device types:
# Device type constants
zmq.QUEUE # Load-balancing queue device
zmq.FORWARDER # Message forwarder device
zmq.STREAMER # Message streamer deviceSteerable proxy devices accept these control commands:
# Control commands for steerable proxy
"PAUSE" # Pause message forwarding
"RESUME" # Resume message forwarding
"TERMINATE" # Terminate the proxy
"STATISTICS" # Get proxy statistics (if supported)from typing import Optional, Dict, Any, List
import threading
# Device types
DeviceType = int # QUEUE, FORWARDER, STREAMER constants
# Socket types for devices
DeviceSocket = Socket
FrontendSocket = Socket
BackendSocket = Socket
CaptureSocket = Optional[Socket]
ControlSocket = Optional[Socket]
MonitorSocket = Socket
# Statistics types
DeviceStats = Dict[str, Any]
MessageCount = int
ByteCount = int
Timestamp = float
# Thread types
DeviceThread = threading.Thread
JoinTimeout = Optional[float]
# Control types
ControlCommand = str # "PAUSE", "RESUME", "TERMINATE", etc.Install with Tessl CLI
npx tessl i tessl/pypi-pyzmq