CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-txzmq

Twisted bindings for ZeroMQ enabling asynchronous ZMQ socket integration with Twisted's reactor pattern.

Overview
Eval results
Files

router-dealer.mddocs/

Router-Dealer Messaging

Advanced routing patterns providing raw ROUTER and DEALER socket access for building custom messaging topologies and complex routing scenarios. These patterns offer the most flexibility for creating sophisticated distributed messaging architectures, custom load balancing, and multi-hop routing systems.

Capabilities

Base Connection Class for Advanced Patterns

Foundation class providing uniform interface for ROUTER and DEALER socket types with consistent message handling methods.

class ZmqBase(ZmqConnection):
    """
    Base class for advanced ZMQ connection types with uniform interface.
    
    Provides consistent wrapper API over underlying socket types while allowing
    connection-specific implementations. Does not hide socket type differences
    but provides more consistent interaction patterns.
    """
    
    def sendMsg(self, message):
        """
        Send single-part message.
        
        Default implementation delegates to sendMultipart([message]).
        Subclasses can override with connection-specific behavior.
        
        Args:
            message (bytes): Message content to send
        """
    
    def sendMultipart(self, parts):
        """
        Send multipart message.
        
        Default implementation delegates to underlying ZmqConnection.send().
        Subclasses can override with connection-specific routing logic.
        
        Args:
            parts (list): List of message parts (bytes)
        """
    
    def messageReceived(self, message):
        """
        Handle incoming message and delegate to gotMessage.
        
        Args:
            message (list): List of message parts from ZeroMQ
        """
    
    def gotMessage(self, *args, **kwargs):
        """
        Abstract method for handling received messages.
        
        Must be implemented by subclasses with socket-specific signature.
        """

Router Connection

Routes messages between multiple peers with explicit addressing. Can send messages to specific recipients and receive messages with sender identification.

class ZmqRouterConnection(ZmqBase):
    """
    Router connection for advanced message routing.
    
    Uses ZeroMQ ROUTER socket type. Can route messages to specific recipients
    and receives messages with sender identity information.
    Provides the foundation for building custom routing topologies.
    """
    
    socketType = constants.ROUTER
    
    def sendMsg(self, recipientId, message):
        """
        Send single-part message to specific recipient.
        
        Args:
            recipientId (bytes): Identity of the target recipient
            message (bytes): Message content to send
        """
    
    def sendMultipart(self, recipientId, parts):
        """
        Send multipart message to specific recipient.
        
        Args:
            recipientId (bytes): Identity of the target recipient
            parts (list): List of message parts (bytes)
        """
    
    def gotMessage(self, sender_id, *message):
        """
        Abstract method called when message is received.
        
        Must be implemented by subclasses to handle incoming messages.
        
        Args:
            sender_id (bytes): Identity of the message sender
            *message: Variable number of message parts (bytes)
        """

Router Usage Example

from twisted.internet import reactor
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqRouterConnection
import json
import time

class MessageBroker(ZmqRouterConnection):
    """Central message broker using ROUTER socket."""
    
    def __init__(self, factory, endpoint):
        super().__init__(factory, endpoint)
        self.clients = {}  # Track connected clients
        self.message_stats = {'received': 0, 'sent': 0, 'errors': 0}
        print("Message broker started")
    
    def gotMessage(self, sender_id, *message):
        """Handle incoming message from any client."""
        self.message_stats['received'] += 1
        sender_str = sender_id.decode('utf-8') if sender_id else 'unknown'
        
        try:
            # Parse message
            msg_data = json.loads(message[0].decode('utf-8'))
            msg_type = msg_data.get('type')
            
            print(f"Received {msg_type} from {sender_str}")
            
            # Update client registry
            if sender_id not in self.clients:
                self.clients[sender_id] = {
                    'first_seen': time.time(),
                    'last_seen': time.time(),
                    'message_count': 0
                }
            
            self.clients[sender_id]['last_seen'] = time.time()
            self.clients[sender_id]['message_count'] += 1
            
            # Route message based on type
            if msg_type == 'register':
                self.handle_registration(sender_id, msg_data)
            elif msg_type == 'direct_message':
                self.handle_direct_message(sender_id, msg_data)
            elif msg_type == 'broadcast':
                self.handle_broadcast(sender_id, msg_data)
            elif msg_type == 'list_clients':
                self.handle_list_clients(sender_id)
            elif msg_type == 'ping':
                self.handle_ping(sender_id, msg_data)
            else:
                self.send_error(sender_id, f"Unknown message type: {msg_type}")
                
        except Exception as e:
            print(f"Error processing message from {sender_str}: {e}")
            self.message_stats['errors'] += 1
            self.send_error(sender_id, f"Message processing error: {e}")
    
    def handle_registration(self, client_id, msg_data):
        """Handle client registration."""
        client_name = msg_data.get('name', 'unnamed')
        self.clients[client_id]['name'] = client_name
        
        response = {
            'type': 'registration_ack',
            'status': 'success',
            'client_id': client_id.decode('utf-8'),
            'message': f'Registered as {client_name}'
        }
        self.send_response(client_id, response)
    
    def handle_direct_message(self, sender_id, msg_data):
        """Route message to specific recipient."""
        target_name = msg_data.get('target')
        content = msg_data.get('content')
        
        # Find target client by name
        target_id = None
        for cid, info in self.clients.items():
            if info.get('name') == target_name:
                target_id = cid
                break
        
        if target_id:
            # Forward message to target
            forward_msg = {
                'type': 'direct_message',
                'from': self.clients[sender_id].get('name', 'unknown'),
                'content': content,
                'timestamp': time.time()
            }
            self.send_response(target_id, forward_msg)
            
            # Confirm to sender
            confirm_msg = {
                'type': 'delivery_confirmation',
                'target': target_name,
                'status': 'delivered'
            }
            self.send_response(sender_id, confirm_msg)
        else:
            self.send_error(sender_id, f"Target client '{target_name}' not found")
    
    def handle_broadcast(self, sender_id, msg_data):
        """Broadcast message to all connected clients except sender."""
        content = msg_data.get('content')
        sender_name = self.clients[sender_id].get('name', 'unknown')
        
        broadcast_msg = {
            'type': 'broadcast',
            'from': sender_name,
            'content': content,
            'timestamp': time.time()
        }
        
        sent_count = 0
        for client_id in self.clients:
            if client_id != sender_id:  # Don't send to sender
                self.send_response(client_id, broadcast_msg)
                sent_count += 1
        
        # Confirm to sender
        confirm_msg = {
            'type': 'broadcast_confirmation',
            'recipients': sent_count
        }
        self.send_response(sender_id, confirm_msg)
    
    def handle_list_clients(self, requester_id):
        """Send list of connected clients."""
        client_list = []
        for client_id, info in self.clients.items():
            client_list.append({
                'name': info.get('name', 'unnamed'),
                'connected_since': info['first_seen'],
                'last_activity': info['last_seen'],
                'message_count': info['message_count']
            })
        
        response = {
            'type': 'client_list',
            'clients': client_list,
            'total': len(client_list)
        }
        self.send_response(requester_id, response)
    
    def handle_ping(self, client_id, msg_data):
        """Respond to ping with pong."""
        response = {
            'type': 'pong',
            'timestamp': time.time(),
            'stats': self.message_stats
        }
        self.send_response(client_id, response)
    
    def send_response(self, client_id, response_data):
        """Send response to specific client."""
        message = json.dumps(response_data).encode('utf-8')
        self.sendMsg(client_id, message)
        self.message_stats['sent'] += 1
    
    def send_error(self, client_id, error_message):
        """Send error response to client."""
        error_response = {
            'type': 'error',
            'message': error_message,
            'timestamp': time.time()
        }
        self.send_response(client_id, error_response)

# Start message broker
factory = ZmqFactory()
factory.registerForShutdown()

endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")
broker = MessageBroker(factory, endpoint)

print("Message broker listening on tcp://*:5555")
reactor.run()

Dealer Connection

Provides raw DEALER socket functionality for peer-to-peer communication and custom routing scenarios.

class ZmqDealerConnection(ZmqBase):
    """
    Dealer connection for peer-to-peer messaging.
    
    Uses ZeroMQ DEALER socket type. Provides raw socket access without
    built-in message correlation. Suitable for custom protocols and
    advanced messaging patterns.
    """
    
    socketType = constants.DEALER
    
    def sendMsg(self, message):
        """
        Send single-part message.
        
        Args:
            message (bytes): Message content to send
        """
    
    def sendMultipart(self, parts):
        """
        Send multipart message.
        
        Args:
            parts (list): List of message parts (bytes)
        """
    
    def gotMessage(self, *args):
        """
        Abstract method called when message is received.
        
        Must be implemented by subclasses to handle incoming messages.
        
        Args:
            *args: Variable number of message parts (bytes)
        """

Dealer Usage Example

from twisted.internet import reactor, defer
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqDealerConnection
import json
import uuid

class BrokerClient(ZmqDealerConnection):
    """Client connecting to message broker using DEALER socket."""
    
    def __init__(self, factory, endpoint, client_name):
        super().__init__(factory, endpoint)
        self.client_name = client_name
        self.pending_requests = {}  # Track pending requests for correlation
        self.is_registered = False
        
        # Register with broker on connection
        reactor.callWhenRunning(self.register)
    
    def register(self):
        """Register with the message broker."""
        register_msg = {
            'type': 'register',
            'name': self.client_name
        }
        self.send_message(register_msg)
    
    def send_direct_message(self, target_name, content):
        """Send direct message to another client."""
        msg = {
            'type': 'direct_message',
            'target': target_name,
            'content': content
        }
        self.send_message(msg)
    
    def send_broadcast(self, content):
        """Broadcast message to all clients."""
        msg = {
            'type': 'broadcast',
            'content': content
        }
        self.send_message(msg)
    
    def list_clients(self):
        """Request list of connected clients."""
        msg = {'type': 'list_clients'}
        self.send_message(msg)
    
    def ping_broker(self):
        """Send ping to broker."""
        msg = {'type': 'ping'}
        self.send_message(msg)
    
    def send_message(self, msg_data):
        """Send message to broker."""
        message = json.dumps(msg_data).encode('utf-8')
        self.sendMsg(message)
    
    def gotMessage(self, *message):
        """Handle incoming message from broker."""
        try:
            msg_data = json.loads(message[0].decode('utf-8'))
            msg_type = msg_data.get('type')
            
            if msg_type == 'registration_ack':
                self.handle_registration_ack(msg_data)
            elif msg_type == 'direct_message':
                self.handle_direct_message(msg_data)
            elif msg_type == 'broadcast':
                self.handle_broadcast(msg_data)
            elif msg_type == 'client_list':
                self.handle_client_list(msg_data)
            elif msg_type == 'pong':
                self.handle_pong(msg_data)
            elif msg_type == 'delivery_confirmation':
                self.handle_delivery_confirmation(msg_data)
            elif msg_type == 'broadcast_confirmation':
                self.handle_broadcast_confirmation(msg_data)
            elif msg_type == 'error':
                self.handle_error(msg_data)
            else:
                print(f"Unknown message type: {msg_type}")
                
        except Exception as e:
            print(f"Error processing message: {e}")
    
    def handle_registration_ack(self, msg_data):
        """Handle registration acknowledgment."""
        self.is_registered = True
        print(f"✓ {msg_data['message']}")
    
    def handle_direct_message(self, msg_data):
        """Handle incoming direct message."""
        sender = msg_data['from']
        content = msg_data['content']
        timestamp = msg_data['timestamp']
        print(f"💬 Direct from {sender}: {content}")
    
    def handle_broadcast(self, msg_data):
        """Handle incoming broadcast message."""
        sender = msg_data['from']
        content = msg_data['content']
        print(f"📢 Broadcast from {sender}: {content}")
    
    def handle_client_list(self, msg_data):
        """Handle client list response."""
        clients = msg_data['clients']
        print(f"👥 Connected clients ({msg_data['total']}):")
        for client in clients:
            print(f"  - {client['name']} (messages: {client['message_count']})")
    
    def handle_pong(self, msg_data):
        """Handle pong response."""
        stats = msg_data['stats']
        print(f"🏓 Pong! Broker stats: {stats}")
    
    def handle_delivery_confirmation(self, msg_data):
        """Handle message delivery confirmation."""
        target = msg_data['target']
        status = msg_data['status']
        print(f"✓ Message to {target}: {status}")
    
    def handle_broadcast_confirmation(self, msg_data):
        """Handle broadcast confirmation."""
        count = msg_data['recipients']
        print(f"✓ Broadcast sent to {count} recipients")
    
    def handle_error(self, msg_data):
        """Handle error message from broker."""
        error = msg_data['message']
        print(f"❌ Error: {error}")

# Interactive client example
def create_interactive_client(client_name):
    """Create an interactive client for testing."""
    factory = ZmqFactory()
    endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
    client = BrokerClient(factory, endpoint, client_name)
    
    def send_test_messages():
        if not client.is_registered:
            reactor.callLater(1.0, send_test_messages)
            return
        
        # Send various test messages
        client.list_clients()
        client.ping_broker()
        client.send_broadcast(f"Hello from {client_name}!")
        
        # Schedule periodic messages
        reactor.callLater(10.0, send_test_messages)
    
    reactor.callLater(2.0, send_test_messages)
    return client

# Usage: Create multiple clients
factory = ZmqFactory()
factory.registerForShutdown()

client1 = create_interactive_client("Alice")
client2 = create_interactive_client("Bob")
client3 = create_interactive_client("Charlie")

print("Started 3 clients: Alice, Bob, Charlie")
print("They will automatically interact with the broker")
reactor.run()

Advanced Routing Patterns

Complex routing topologies using ROUTER-DEALER combinations for building scalable distributed systems.

Multi-Hop Routing

class RoutingNode(ZmqRouterConnection):
    """Intermediate routing node for multi-hop messaging."""
    
    def __init__(self, factory, bind_endpoint, node_id, upstream_endpoints=None):
        super().__init__(factory, bind_endpoint)
        self.node_id = node_id
        self.routing_table = {}  # destination -> next_hop
        self.upstream_connections = []
        
        # Connect to upstream nodes
        if upstream_endpoints:
            for endpoint in upstream_endpoints:
                dealer = ZmqDealerConnection(factory, endpoint)
                dealer.messageReceived = self.handle_upstream_message
                self.upstream_connections.append(dealer)
        
        print(f"Routing node {node_id} started")
    
    def gotMessage(self, sender_id, *message):
        """Handle message from downstream clients."""
        try:
            msg_data = json.loads(message[0].decode('utf-8'))
            destination = msg_data.get('destination')
            
            if destination == self.node_id:
                # Message for this node
                self.handle_local_message(sender_id, msg_data)
            else:
                # Route message based on routing table
                self.route_message(sender_id, destination, msg_data)
                
        except Exception as e:
            print(f"Routing error: {e}")
    
    def route_message(self, sender_id, destination, msg_data):
        """Route message to appropriate next hop."""
        if destination in self.routing_table:
            next_hop = self.routing_table[destination]
            # Add routing information
            msg_data['route_path'] = msg_data.get('route_path', []) + [self.node_id]
            msg_data['original_sender'] = sender_id.decode('utf-8')
            
            # Forward to next hop
            forwarded_msg = json.dumps(msg_data).encode('utf-8')
            if next_hop == 'upstream':
                # Send via upstream connection
                for conn in self.upstream_connections:
                    conn.sendMsg(forwarded_msg)
            else:
                # Send to local client
                self.sendMsg(next_hop.encode('utf-8'), forwarded_msg)
        else:
            # Unknown destination - send error back
            error_msg = {
                'type': 'routing_error',
                'message': f'Unknown destination: {destination}',
                'original_message': msg_data
            }
            self.sendMsg(sender_id, json.dumps(error_msg).encode('utf-8'))
    
    def handle_upstream_message(self, message):
        """Handle message from upstream routing node."""
        try:
            msg_data = json.loads(message[0].decode('utf-8'))
            destination = msg_data.get('destination')
            
            if destination in self.routing_table:
                # Route locally
                target_id = self.routing_table[destination].encode('utf-8')
                self.sendMsg(target_id, message[0])
            else:
                print(f"Cannot route upstream message to {destination}")
                
        except Exception as e:
            print(f"Upstream message handling error: {e}")
    
    def update_routing_table(self, destination, next_hop):
        """Update routing table entry."""
        self.routing_table[destination] = next_hop
        print(f"Route updated: {destination} -> {next_hop}")

# Create hierarchical routing topology
factory = ZmqFactory()

# Top-level router
top_router = RoutingNode(
    factory,
    ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555"),
    "top_router"
)

# Regional routers
east_router = RoutingNode(
    factory,
    ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5556"),
    "east_router",
    [ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")]
)

west_router = RoutingNode(
    factory,
    ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5557"),
    "west_router",
    [ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")]
)

# Configure routing tables
top_router.update_routing_table("east_region", "upstream")
top_router.update_routing_table("west_region", "upstream")

print("Multi-hop routing topology created")

Custom Protocol Implementation

Building custom messaging protocols using ROUTER-DEALER patterns.

class CustomProtocolRouter(ZmqRouterConnection):
    """Custom protocol implementation with message versioning and compression."""
    
    PROTOCOL_VERSION = "1.0"
    
    def __init__(self, factory, endpoint):
        super().__init__(factory, endpoint)
        self.session_store = {}  # client_id -> session_info
        self.message_handlers = {
            'HELLO': self.handle_hello,
            'DATA': self.handle_data,
            'PING': self.handle_ping,
            'BYE': self.handle_bye
        }
    
    def gotMessage(self, sender_id, *message):
        """Handle incoming protocol message."""
        try:
            # Parse protocol header
            if len(message) < 2:
                self.send_error(sender_id, "Invalid message format")
                return
            
            header = json.loads(message[0].decode('utf-8'))
            payload = message[1]
            
            # Validate protocol version
            if header.get('version') != self.PROTOCOL_VERSION:
                self.send_error(sender_id, "Unsupported protocol version")
                return
            
            # Extract message info
            msg_type = header.get('type')
            msg_id = header.get('id')
            compressed = header.get('compressed', False)
            
            # Decompress payload if needed
            if compressed:
                import zlib
                payload = zlib.decompress(payload)
            
            # Route to handler
            if msg_type in self.message_handlers:
                self.message_handlers[msg_type](sender_id, header, payload, msg_id)
            else:
                self.send_error(sender_id, f"Unknown message type: {msg_type}")
                
        except Exception as e:
            print(f"Protocol error: {e}")
            self.send_error(sender_id, f"Protocol error: {e}")
    
    def handle_hello(self, sender_id, header, payload, msg_id):
        """Handle client hello/handshake."""
        try:
            hello_data = json.loads(payload.decode('utf-8'))
            client_name = hello_data.get('name', 'unknown')
            
            # Create session
            self.session_store[sender_id] = {
                'name': client_name,
                'connected_at': time.time(),
                'last_ping': time.time(),
                'message_count': 0
            }
            
            # Send welcome response
            welcome_data = {
                'status': 'welcome',
                'server_time': time.time(),
                'session_id': sender_id.decode('utf-8')
            }
            
            self.send_protocol_message(sender_id, 'WELCOME', welcome_data, msg_id)
            print(f"Client {client_name} connected")
            
        except Exception as e:
            self.send_error(sender_id, f"Hello processing error: {e}")
    
    def handle_data(self, sender_id, header, payload, msg_id):
        """Handle data message."""
        if sender_id not in self.session_store:
            self.send_error(sender_id, "Not authenticated")
            return
        
        # Update session
        self.session_store[sender_id]['message_count'] += 1
        self.session_store[sender_id]['last_activity'] = time.time()
        
        # Process data (echo back with processing info)
        processed_data = {
            'original_size': len(payload),
            'processed_at': time.time(),
            'message_number': self.session_store[sender_id]['message_count'],
            'echo': payload.decode('utf-8')[:100]  # First 100 chars
        }
        
        self.send_protocol_message(sender_id, 'DATA_ACK', processed_data, msg_id)
    
    def handle_ping(self, sender_id, header, payload, msg_id):
        """Handle ping message."""
        if sender_id in self.session_store:
            self.session_store[sender_id]['last_ping'] = time.time()
        
        pong_data = {
            'server_time': time.time(),
            'client_count': len(self.session_store)
        }
        
        self.send_protocol_message(sender_id, 'PONG', pong_data, msg_id)
    
    def handle_bye(self, sender_id, header, payload, msg_id):
        """Handle client disconnect."""
        if sender_id in self.session_store:
            session = self.session_store.pop(sender_id)
            print(f"Client {session.get('name', 'unknown')} disconnected")
        
        bye_data = {'status': 'goodbye'}
        self.send_protocol_message(sender_id, 'BYE_ACK', bye_data, msg_id)
    
    def send_protocol_message(self, client_id, msg_type, data, reply_to_id=None):
        """Send message using custom protocol format."""
        import uuid
        
        # Create protocol header
        header = {
            'version': self.PROTOCOL_VERSION,
            'type': msg_type,
            'id': str(uuid.uuid4()),
            'timestamp': time.time(),
            'compressed': False
        }
        
        if reply_to_id:
            header['reply_to'] = reply_to_id
        
        # Serialize payload
        payload = json.dumps(data).encode('utf-8')
        
        # Optional compression for large messages
        if len(payload) > 1024:
            import zlib
            payload = zlib.compress(payload)
            header['compressed'] = True
        
        # Send as multipart message
        header_bytes = json.dumps(header).encode('utf-8')
        self.sendMultipart(client_id, [header_bytes, payload])
    
    def send_error(self, client_id, error_message):
        """Send error message to client."""
        error_data = {'error': error_message}
        self.send_protocol_message(client_id, 'ERROR', error_data)

# Custom protocol client
class CustomProtocolClient(ZmqDealerConnection):
    """Client implementing custom protocol."""
    
    def __init__(self, factory, endpoint, client_name):
        super().__init__(factory, endpoint)
        self.client_name = client_name
        self.connected = False
        
        # Send hello on connection
        reactor.callWhenRunning(self.send_hello)
    
    def send_hello(self):
        """Send hello message to server."""
        hello_data = {'name': self.client_name}
        self.send_protocol_message('HELLO', hello_data)
    
    def send_protocol_message(self, msg_type, data):
        """Send message using custom protocol."""
        import uuid
        
        header = {
            'version': "1.0",
            'type': msg_type,
            'id': str(uuid.uuid4()),
            'timestamp': time.time(),
            'compressed': False
        }
        
        payload = json.dumps(data).encode('utf-8')
        header_bytes = json.dumps(header).encode('utf-8')
        
        self.sendMultipart([header_bytes, payload])
    
    def gotMessage(self, *message):
        """Handle incoming protocol message."""
        try:
            header = json.loads(message[0].decode('utf-8'))
            payload = json.loads(message[1].decode('utf-8'))
            
            msg_type = header['type']
            
            if msg_type == 'WELCOME':
                self.connected = True
                print(f"✓ Connected as {self.client_name}")
            elif msg_type == 'DATA_ACK':
                print(f"Data processed: {payload}")
            elif msg_type == 'PONG':
                print(f"Pong: {payload}")
            elif msg_type == 'ERROR':
                print(f"Error: {payload['error']}")
                
        except Exception as e:
            print(f"Client protocol error: {e}")

# Usage
factory = ZmqFactory()

# Start custom protocol server
server_endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")
server = CustomProtocolRouter(factory, server_endpoint)

# Create clients
client1 = CustomProtocolClient(
    factory,
    ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555"),
    "TestClient1"
)

print("Custom protocol server and client started")

Install with Tessl CLI

npx tessl i tessl/pypi-txzmq

docs

factory-connection.md

index.md

pubsub.md

pushpull.md

reqrep.md

router-dealer.md

tile.json