Twisted bindings for ZeroMQ enabling asynchronous ZMQ socket integration with Twisted's reactor pattern.
Advanced routing patterns providing raw ROUTER and DEALER socket access for building custom messaging topologies and complex routing scenarios. These patterns offer the most flexibility for creating sophisticated distributed messaging architectures, custom load balancing, and multi-hop routing systems.
Foundation class providing uniform interface for ROUTER and DEALER socket types with consistent message handling methods.
class ZmqBase(ZmqConnection):
"""
Base class for advanced ZMQ connection types with uniform interface.
Provides consistent wrapper API over underlying socket types while allowing
connection-specific implementations. Does not hide socket type differences
but provides more consistent interaction patterns.
"""
def sendMsg(self, message):
"""
Send single-part message.
Default implementation delegates to sendMultipart([message]).
Subclasses can override with connection-specific behavior.
Args:
message (bytes): Message content to send
"""
def sendMultipart(self, parts):
"""
Send multipart message.
Default implementation delegates to underlying ZmqConnection.send().
Subclasses can override with connection-specific routing logic.
Args:
parts (list): List of message parts (bytes)
"""
def messageReceived(self, message):
"""
Handle incoming message and delegate to gotMessage.
Args:
message (list): List of message parts from ZeroMQ
"""
def gotMessage(self, *args, **kwargs):
"""
Abstract method for handling received messages.
Must be implemented by subclasses with socket-specific signature.
"""Routes messages between multiple peers with explicit addressing. Can send messages to specific recipients and receive messages with sender identification.
class ZmqRouterConnection(ZmqBase):
"""
Router connection for advanced message routing.
Uses ZeroMQ ROUTER socket type. Can route messages to specific recipients
and receives messages with sender identity information.
Provides the foundation for building custom routing topologies.
"""
socketType = constants.ROUTER
def sendMsg(self, recipientId, message):
"""
Send single-part message to specific recipient.
Args:
recipientId (bytes): Identity of the target recipient
message (bytes): Message content to send
"""
def sendMultipart(self, recipientId, parts):
"""
Send multipart message to specific recipient.
Args:
recipientId (bytes): Identity of the target recipient
parts (list): List of message parts (bytes)
"""
def gotMessage(self, sender_id, *message):
"""
Abstract method called when message is received.
Must be implemented by subclasses to handle incoming messages.
Args:
sender_id (bytes): Identity of the message sender
*message: Variable number of message parts (bytes)
"""from twisted.internet import reactor
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqRouterConnection
import json
import time
class MessageBroker(ZmqRouterConnection):
"""Central message broker using ROUTER socket."""
def __init__(self, factory, endpoint):
super().__init__(factory, endpoint)
self.clients = {} # Track connected clients
self.message_stats = {'received': 0, 'sent': 0, 'errors': 0}
print("Message broker started")
def gotMessage(self, sender_id, *message):
"""Handle incoming message from any client."""
self.message_stats['received'] += 1
sender_str = sender_id.decode('utf-8') if sender_id else 'unknown'
try:
# Parse message
msg_data = json.loads(message[0].decode('utf-8'))
msg_type = msg_data.get('type')
print(f"Received {msg_type} from {sender_str}")
# Update client registry
if sender_id not in self.clients:
self.clients[sender_id] = {
'first_seen': time.time(),
'last_seen': time.time(),
'message_count': 0
}
self.clients[sender_id]['last_seen'] = time.time()
self.clients[sender_id]['message_count'] += 1
# Route message based on type
if msg_type == 'register':
self.handle_registration(sender_id, msg_data)
elif msg_type == 'direct_message':
self.handle_direct_message(sender_id, msg_data)
elif msg_type == 'broadcast':
self.handle_broadcast(sender_id, msg_data)
elif msg_type == 'list_clients':
self.handle_list_clients(sender_id)
elif msg_type == 'ping':
self.handle_ping(sender_id, msg_data)
else:
self.send_error(sender_id, f"Unknown message type: {msg_type}")
except Exception as e:
print(f"Error processing message from {sender_str}: {e}")
self.message_stats['errors'] += 1
self.send_error(sender_id, f"Message processing error: {e}")
def handle_registration(self, client_id, msg_data):
"""Handle client registration."""
client_name = msg_data.get('name', 'unnamed')
self.clients[client_id]['name'] = client_name
response = {
'type': 'registration_ack',
'status': 'success',
'client_id': client_id.decode('utf-8'),
'message': f'Registered as {client_name}'
}
self.send_response(client_id, response)
def handle_direct_message(self, sender_id, msg_data):
"""Route message to specific recipient."""
target_name = msg_data.get('target')
content = msg_data.get('content')
# Find target client by name
target_id = None
for cid, info in self.clients.items():
if info.get('name') == target_name:
target_id = cid
break
if target_id:
# Forward message to target
forward_msg = {
'type': 'direct_message',
'from': self.clients[sender_id].get('name', 'unknown'),
'content': content,
'timestamp': time.time()
}
self.send_response(target_id, forward_msg)
# Confirm to sender
confirm_msg = {
'type': 'delivery_confirmation',
'target': target_name,
'status': 'delivered'
}
self.send_response(sender_id, confirm_msg)
else:
self.send_error(sender_id, f"Target client '{target_name}' not found")
def handle_broadcast(self, sender_id, msg_data):
"""Broadcast message to all connected clients except sender."""
content = msg_data.get('content')
sender_name = self.clients[sender_id].get('name', 'unknown')
broadcast_msg = {
'type': 'broadcast',
'from': sender_name,
'content': content,
'timestamp': time.time()
}
sent_count = 0
for client_id in self.clients:
if client_id != sender_id: # Don't send to sender
self.send_response(client_id, broadcast_msg)
sent_count += 1
# Confirm to sender
confirm_msg = {
'type': 'broadcast_confirmation',
'recipients': sent_count
}
self.send_response(sender_id, confirm_msg)
def handle_list_clients(self, requester_id):
"""Send list of connected clients."""
client_list = []
for client_id, info in self.clients.items():
client_list.append({
'name': info.get('name', 'unnamed'),
'connected_since': info['first_seen'],
'last_activity': info['last_seen'],
'message_count': info['message_count']
})
response = {
'type': 'client_list',
'clients': client_list,
'total': len(client_list)
}
self.send_response(requester_id, response)
def handle_ping(self, client_id, msg_data):
"""Respond to ping with pong."""
response = {
'type': 'pong',
'timestamp': time.time(),
'stats': self.message_stats
}
self.send_response(client_id, response)
def send_response(self, client_id, response_data):
"""Send response to specific client."""
message = json.dumps(response_data).encode('utf-8')
self.sendMsg(client_id, message)
self.message_stats['sent'] += 1
def send_error(self, client_id, error_message):
"""Send error response to client."""
error_response = {
'type': 'error',
'message': error_message,
'timestamp': time.time()
}
self.send_response(client_id, error_response)
# Start message broker
factory = ZmqFactory()
factory.registerForShutdown()
endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")
broker = MessageBroker(factory, endpoint)
print("Message broker listening on tcp://*:5555")
reactor.run()Provides raw DEALER socket functionality for peer-to-peer communication and custom routing scenarios.
class ZmqDealerConnection(ZmqBase):
"""
Dealer connection for peer-to-peer messaging.
Uses ZeroMQ DEALER socket type. Provides raw socket access without
built-in message correlation. Suitable for custom protocols and
advanced messaging patterns.
"""
socketType = constants.DEALER
def sendMsg(self, message):
"""
Send single-part message.
Args:
message (bytes): Message content to send
"""
def sendMultipart(self, parts):
"""
Send multipart message.
Args:
parts (list): List of message parts (bytes)
"""
def gotMessage(self, *args):
"""
Abstract method called when message is received.
Must be implemented by subclasses to handle incoming messages.
Args:
*args: Variable number of message parts (bytes)
"""from twisted.internet import reactor, defer
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqDealerConnection
import json
import uuid
class BrokerClient(ZmqDealerConnection):
"""Client connecting to message broker using DEALER socket."""
def __init__(self, factory, endpoint, client_name):
super().__init__(factory, endpoint)
self.client_name = client_name
self.pending_requests = {} # Track pending requests for correlation
self.is_registered = False
# Register with broker on connection
reactor.callWhenRunning(self.register)
def register(self):
"""Register with the message broker."""
register_msg = {
'type': 'register',
'name': self.client_name
}
self.send_message(register_msg)
def send_direct_message(self, target_name, content):
"""Send direct message to another client."""
msg = {
'type': 'direct_message',
'target': target_name,
'content': content
}
self.send_message(msg)
def send_broadcast(self, content):
"""Broadcast message to all clients."""
msg = {
'type': 'broadcast',
'content': content
}
self.send_message(msg)
def list_clients(self):
"""Request list of connected clients."""
msg = {'type': 'list_clients'}
self.send_message(msg)
def ping_broker(self):
"""Send ping to broker."""
msg = {'type': 'ping'}
self.send_message(msg)
def send_message(self, msg_data):
"""Send message to broker."""
message = json.dumps(msg_data).encode('utf-8')
self.sendMsg(message)
def gotMessage(self, *message):
"""Handle incoming message from broker."""
try:
msg_data = json.loads(message[0].decode('utf-8'))
msg_type = msg_data.get('type')
if msg_type == 'registration_ack':
self.handle_registration_ack(msg_data)
elif msg_type == 'direct_message':
self.handle_direct_message(msg_data)
elif msg_type == 'broadcast':
self.handle_broadcast(msg_data)
elif msg_type == 'client_list':
self.handle_client_list(msg_data)
elif msg_type == 'pong':
self.handle_pong(msg_data)
elif msg_type == 'delivery_confirmation':
self.handle_delivery_confirmation(msg_data)
elif msg_type == 'broadcast_confirmation':
self.handle_broadcast_confirmation(msg_data)
elif msg_type == 'error':
self.handle_error(msg_data)
else:
print(f"Unknown message type: {msg_type}")
except Exception as e:
print(f"Error processing message: {e}")
def handle_registration_ack(self, msg_data):
"""Handle registration acknowledgment."""
self.is_registered = True
print(f"✓ {msg_data['message']}")
def handle_direct_message(self, msg_data):
"""Handle incoming direct message."""
sender = msg_data['from']
content = msg_data['content']
timestamp = msg_data['timestamp']
print(f"💬 Direct from {sender}: {content}")
def handle_broadcast(self, msg_data):
"""Handle incoming broadcast message."""
sender = msg_data['from']
content = msg_data['content']
print(f"📢 Broadcast from {sender}: {content}")
def handle_client_list(self, msg_data):
"""Handle client list response."""
clients = msg_data['clients']
print(f"👥 Connected clients ({msg_data['total']}):")
for client in clients:
print(f" - {client['name']} (messages: {client['message_count']})")
def handle_pong(self, msg_data):
"""Handle pong response."""
stats = msg_data['stats']
print(f"🏓 Pong! Broker stats: {stats}")
def handle_delivery_confirmation(self, msg_data):
"""Handle message delivery confirmation."""
target = msg_data['target']
status = msg_data['status']
print(f"✓ Message to {target}: {status}")
def handle_broadcast_confirmation(self, msg_data):
"""Handle broadcast confirmation."""
count = msg_data['recipients']
print(f"✓ Broadcast sent to {count} recipients")
def handle_error(self, msg_data):
"""Handle error message from broker."""
error = msg_data['message']
print(f"❌ Error: {error}")
# Interactive client example
def create_interactive_client(client_name):
"""Create an interactive client for testing."""
factory = ZmqFactory()
endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
client = BrokerClient(factory, endpoint, client_name)
def send_test_messages():
if not client.is_registered:
reactor.callLater(1.0, send_test_messages)
return
# Send various test messages
client.list_clients()
client.ping_broker()
client.send_broadcast(f"Hello from {client_name}!")
# Schedule periodic messages
reactor.callLater(10.0, send_test_messages)
reactor.callLater(2.0, send_test_messages)
return client
# Usage: Create multiple clients
factory = ZmqFactory()
factory.registerForShutdown()
client1 = create_interactive_client("Alice")
client2 = create_interactive_client("Bob")
client3 = create_interactive_client("Charlie")
print("Started 3 clients: Alice, Bob, Charlie")
print("They will automatically interact with the broker")
reactor.run()Complex routing topologies using ROUTER-DEALER combinations for building scalable distributed systems.
class RoutingNode(ZmqRouterConnection):
"""Intermediate routing node for multi-hop messaging."""
def __init__(self, factory, bind_endpoint, node_id, upstream_endpoints=None):
super().__init__(factory, bind_endpoint)
self.node_id = node_id
self.routing_table = {} # destination -> next_hop
self.upstream_connections = []
# Connect to upstream nodes
if upstream_endpoints:
for endpoint in upstream_endpoints:
dealer = ZmqDealerConnection(factory, endpoint)
dealer.messageReceived = self.handle_upstream_message
self.upstream_connections.append(dealer)
print(f"Routing node {node_id} started")
def gotMessage(self, sender_id, *message):
"""Handle message from downstream clients."""
try:
msg_data = json.loads(message[0].decode('utf-8'))
destination = msg_data.get('destination')
if destination == self.node_id:
# Message for this node
self.handle_local_message(sender_id, msg_data)
else:
# Route message based on routing table
self.route_message(sender_id, destination, msg_data)
except Exception as e:
print(f"Routing error: {e}")
def route_message(self, sender_id, destination, msg_data):
"""Route message to appropriate next hop."""
if destination in self.routing_table:
next_hop = self.routing_table[destination]
# Add routing information
msg_data['route_path'] = msg_data.get('route_path', []) + [self.node_id]
msg_data['original_sender'] = sender_id.decode('utf-8')
# Forward to next hop
forwarded_msg = json.dumps(msg_data).encode('utf-8')
if next_hop == 'upstream':
# Send via upstream connection
for conn in self.upstream_connections:
conn.sendMsg(forwarded_msg)
else:
# Send to local client
self.sendMsg(next_hop.encode('utf-8'), forwarded_msg)
else:
# Unknown destination - send error back
error_msg = {
'type': 'routing_error',
'message': f'Unknown destination: {destination}',
'original_message': msg_data
}
self.sendMsg(sender_id, json.dumps(error_msg).encode('utf-8'))
def handle_upstream_message(self, message):
"""Handle message from upstream routing node."""
try:
msg_data = json.loads(message[0].decode('utf-8'))
destination = msg_data.get('destination')
if destination in self.routing_table:
# Route locally
target_id = self.routing_table[destination].encode('utf-8')
self.sendMsg(target_id, message[0])
else:
print(f"Cannot route upstream message to {destination}")
except Exception as e:
print(f"Upstream message handling error: {e}")
def update_routing_table(self, destination, next_hop):
"""Update routing table entry."""
self.routing_table[destination] = next_hop
print(f"Route updated: {destination} -> {next_hop}")
# Create hierarchical routing topology
factory = ZmqFactory()
# Top-level router
top_router = RoutingNode(
factory,
ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555"),
"top_router"
)
# Regional routers
east_router = RoutingNode(
factory,
ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5556"),
"east_router",
[ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")]
)
west_router = RoutingNode(
factory,
ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5557"),
"west_router",
[ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")]
)
# Configure routing tables
top_router.update_routing_table("east_region", "upstream")
top_router.update_routing_table("west_region", "upstream")
print("Multi-hop routing topology created")Building custom messaging protocols using ROUTER-DEALER patterns.
class CustomProtocolRouter(ZmqRouterConnection):
"""Custom protocol implementation with message versioning and compression."""
PROTOCOL_VERSION = "1.0"
def __init__(self, factory, endpoint):
super().__init__(factory, endpoint)
self.session_store = {} # client_id -> session_info
self.message_handlers = {
'HELLO': self.handle_hello,
'DATA': self.handle_data,
'PING': self.handle_ping,
'BYE': self.handle_bye
}
def gotMessage(self, sender_id, *message):
"""Handle incoming protocol message."""
try:
# Parse protocol header
if len(message) < 2:
self.send_error(sender_id, "Invalid message format")
return
header = json.loads(message[0].decode('utf-8'))
payload = message[1]
# Validate protocol version
if header.get('version') != self.PROTOCOL_VERSION:
self.send_error(sender_id, "Unsupported protocol version")
return
# Extract message info
msg_type = header.get('type')
msg_id = header.get('id')
compressed = header.get('compressed', False)
# Decompress payload if needed
if compressed:
import zlib
payload = zlib.decompress(payload)
# Route to handler
if msg_type in self.message_handlers:
self.message_handlers[msg_type](sender_id, header, payload, msg_id)
else:
self.send_error(sender_id, f"Unknown message type: {msg_type}")
except Exception as e:
print(f"Protocol error: {e}")
self.send_error(sender_id, f"Protocol error: {e}")
def handle_hello(self, sender_id, header, payload, msg_id):
"""Handle client hello/handshake."""
try:
hello_data = json.loads(payload.decode('utf-8'))
client_name = hello_data.get('name', 'unknown')
# Create session
self.session_store[sender_id] = {
'name': client_name,
'connected_at': time.time(),
'last_ping': time.time(),
'message_count': 0
}
# Send welcome response
welcome_data = {
'status': 'welcome',
'server_time': time.time(),
'session_id': sender_id.decode('utf-8')
}
self.send_protocol_message(sender_id, 'WELCOME', welcome_data, msg_id)
print(f"Client {client_name} connected")
except Exception as e:
self.send_error(sender_id, f"Hello processing error: {e}")
def handle_data(self, sender_id, header, payload, msg_id):
"""Handle data message."""
if sender_id not in self.session_store:
self.send_error(sender_id, "Not authenticated")
return
# Update session
self.session_store[sender_id]['message_count'] += 1
self.session_store[sender_id]['last_activity'] = time.time()
# Process data (echo back with processing info)
processed_data = {
'original_size': len(payload),
'processed_at': time.time(),
'message_number': self.session_store[sender_id]['message_count'],
'echo': payload.decode('utf-8')[:100] # First 100 chars
}
self.send_protocol_message(sender_id, 'DATA_ACK', processed_data, msg_id)
def handle_ping(self, sender_id, header, payload, msg_id):
"""Handle ping message."""
if sender_id in self.session_store:
self.session_store[sender_id]['last_ping'] = time.time()
pong_data = {
'server_time': time.time(),
'client_count': len(self.session_store)
}
self.send_protocol_message(sender_id, 'PONG', pong_data, msg_id)
def handle_bye(self, sender_id, header, payload, msg_id):
"""Handle client disconnect."""
if sender_id in self.session_store:
session = self.session_store.pop(sender_id)
print(f"Client {session.get('name', 'unknown')} disconnected")
bye_data = {'status': 'goodbye'}
self.send_protocol_message(sender_id, 'BYE_ACK', bye_data, msg_id)
def send_protocol_message(self, client_id, msg_type, data, reply_to_id=None):
"""Send message using custom protocol format."""
import uuid
# Create protocol header
header = {
'version': self.PROTOCOL_VERSION,
'type': msg_type,
'id': str(uuid.uuid4()),
'timestamp': time.time(),
'compressed': False
}
if reply_to_id:
header['reply_to'] = reply_to_id
# Serialize payload
payload = json.dumps(data).encode('utf-8')
# Optional compression for large messages
if len(payload) > 1024:
import zlib
payload = zlib.compress(payload)
header['compressed'] = True
# Send as multipart message
header_bytes = json.dumps(header).encode('utf-8')
self.sendMultipart(client_id, [header_bytes, payload])
def send_error(self, client_id, error_message):
"""Send error message to client."""
error_data = {'error': error_message}
self.send_protocol_message(client_id, 'ERROR', error_data)
# Custom protocol client
class CustomProtocolClient(ZmqDealerConnection):
"""Client implementing custom protocol."""
def __init__(self, factory, endpoint, client_name):
super().__init__(factory, endpoint)
self.client_name = client_name
self.connected = False
# Send hello on connection
reactor.callWhenRunning(self.send_hello)
def send_hello(self):
"""Send hello message to server."""
hello_data = {'name': self.client_name}
self.send_protocol_message('HELLO', hello_data)
def send_protocol_message(self, msg_type, data):
"""Send message using custom protocol."""
import uuid
header = {
'version': "1.0",
'type': msg_type,
'id': str(uuid.uuid4()),
'timestamp': time.time(),
'compressed': False
}
payload = json.dumps(data).encode('utf-8')
header_bytes = json.dumps(header).encode('utf-8')
self.sendMultipart([header_bytes, payload])
def gotMessage(self, *message):
"""Handle incoming protocol message."""
try:
header = json.loads(message[0].decode('utf-8'))
payload = json.loads(message[1].decode('utf-8'))
msg_type = header['type']
if msg_type == 'WELCOME':
self.connected = True
print(f"✓ Connected as {self.client_name}")
elif msg_type == 'DATA_ACK':
print(f"Data processed: {payload}")
elif msg_type == 'PONG':
print(f"Pong: {payload}")
elif msg_type == 'ERROR':
print(f"Error: {payload['error']}")
except Exception as e:
print(f"Client protocol error: {e}")
# Usage
factory = ZmqFactory()
# Start custom protocol server
server_endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")
server = CustomProtocolRouter(factory, server_endpoint)
# Create clients
client1 = CustomProtocolClient(
factory,
ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555"),
"TestClient1"
)
print("Custom protocol server and client started")Install with Tessl CLI
npx tessl i tessl/pypi-txzmq