0
# Router-Dealer Messaging
1
2
Advanced routing patterns providing raw ROUTER and DEALER socket access for building custom messaging topologies and complex routing scenarios. These patterns offer the most flexibility for creating sophisticated distributed messaging architectures, custom load balancing, and multi-hop routing systems.
3
4
## Capabilities
5
6
### Base Connection Class for Advanced Patterns
7
8
Foundation class providing uniform interface for ROUTER and DEALER socket types with consistent message handling methods.
9
10
```python { .api }
11
class ZmqBase(ZmqConnection):
12
"""
13
Base class for advanced ZMQ connection types with uniform interface.
14
15
Provides consistent wrapper API over underlying socket types while allowing
16
connection-specific implementations. Does not hide socket type differences
17
but provides more consistent interaction patterns.
18
"""
19
20
def sendMsg(self, message):
21
"""
22
Send single-part message.
23
24
Default implementation delegates to sendMultipart([message]).
25
Subclasses can override with connection-specific behavior.
26
27
Args:
28
message (bytes): Message content to send
29
"""
30
31
def sendMultipart(self, parts):
32
"""
33
Send multipart message.
34
35
Default implementation delegates to underlying ZmqConnection.send().
36
Subclasses can override with connection-specific routing logic.
37
38
Args:
39
parts (list): List of message parts (bytes)
40
"""
41
42
def messageReceived(self, message):
43
"""
44
Handle incoming message and delegate to gotMessage.
45
46
Args:
47
message (list): List of message parts from ZeroMQ
48
"""
49
50
def gotMessage(self, *args, **kwargs):
51
"""
52
Abstract method for handling received messages.
53
54
Must be implemented by subclasses with socket-specific signature.
55
"""
56
```
57
58
### Router Connection
59
60
Routes messages between multiple peers with explicit addressing. Can send messages to specific recipients and receive messages with sender identification.
61
62
```python { .api }
63
class ZmqRouterConnection(ZmqBase):
64
"""
65
Router connection for advanced message routing.
66
67
Uses ZeroMQ ROUTER socket type. Can route messages to specific recipients
68
and receives messages with sender identity information.
69
Provides the foundation for building custom routing topologies.
70
"""
71
72
socketType = constants.ROUTER
73
74
def sendMsg(self, recipientId, message):
75
"""
76
Send single-part message to specific recipient.
77
78
Args:
79
recipientId (bytes): Identity of the target recipient
80
message (bytes): Message content to send
81
"""
82
83
def sendMultipart(self, recipientId, parts):
84
"""
85
Send multipart message to specific recipient.
86
87
Args:
88
recipientId (bytes): Identity of the target recipient
89
parts (list): List of message parts (bytes)
90
"""
91
92
def gotMessage(self, sender_id, *message):
93
"""
94
Abstract method called when message is received.
95
96
Must be implemented by subclasses to handle incoming messages.
97
98
Args:
99
sender_id (bytes): Identity of the message sender
100
*message: Variable number of message parts (bytes)
101
"""
102
```
103
104
#### Router Usage Example
105
106
```python
107
from twisted.internet import reactor
108
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqRouterConnection
109
import json
110
import time
111
112
class MessageBroker(ZmqRouterConnection):
113
"""Central message broker using ROUTER socket."""
114
115
def __init__(self, factory, endpoint):
116
super().__init__(factory, endpoint)
117
self.clients = {} # Track connected clients
118
self.message_stats = {'received': 0, 'sent': 0, 'errors': 0}
119
print("Message broker started")
120
121
def gotMessage(self, sender_id, *message):
122
"""Handle incoming message from any client."""
123
self.message_stats['received'] += 1
124
sender_str = sender_id.decode('utf-8') if sender_id else 'unknown'
125
126
try:
127
# Parse message
128
msg_data = json.loads(message[0].decode('utf-8'))
129
msg_type = msg_data.get('type')
130
131
print(f"Received {msg_type} from {sender_str}")
132
133
# Update client registry
134
if sender_id not in self.clients:
135
self.clients[sender_id] = {
136
'first_seen': time.time(),
137
'last_seen': time.time(),
138
'message_count': 0
139
}
140
141
self.clients[sender_id]['last_seen'] = time.time()
142
self.clients[sender_id]['message_count'] += 1
143
144
# Route message based on type
145
if msg_type == 'register':
146
self.handle_registration(sender_id, msg_data)
147
elif msg_type == 'direct_message':
148
self.handle_direct_message(sender_id, msg_data)
149
elif msg_type == 'broadcast':
150
self.handle_broadcast(sender_id, msg_data)
151
elif msg_type == 'list_clients':
152
self.handle_list_clients(sender_id)
153
elif msg_type == 'ping':
154
self.handle_ping(sender_id, msg_data)
155
else:
156
self.send_error(sender_id, f"Unknown message type: {msg_type}")
157
158
except Exception as e:
159
print(f"Error processing message from {sender_str}: {e}")
160
self.message_stats['errors'] += 1
161
self.send_error(sender_id, f"Message processing error: {e}")
162
163
def handle_registration(self, client_id, msg_data):
164
"""Handle client registration."""
165
client_name = msg_data.get('name', 'unnamed')
166
self.clients[client_id]['name'] = client_name
167
168
response = {
169
'type': 'registration_ack',
170
'status': 'success',
171
'client_id': client_id.decode('utf-8'),
172
'message': f'Registered as {client_name}'
173
}
174
self.send_response(client_id, response)
175
176
def handle_direct_message(self, sender_id, msg_data):
177
"""Route message to specific recipient."""
178
target_name = msg_data.get('target')
179
content = msg_data.get('content')
180
181
# Find target client by name
182
target_id = None
183
for cid, info in self.clients.items():
184
if info.get('name') == target_name:
185
target_id = cid
186
break
187
188
if target_id:
189
# Forward message to target
190
forward_msg = {
191
'type': 'direct_message',
192
'from': self.clients[sender_id].get('name', 'unknown'),
193
'content': content,
194
'timestamp': time.time()
195
}
196
self.send_response(target_id, forward_msg)
197
198
# Confirm to sender
199
confirm_msg = {
200
'type': 'delivery_confirmation',
201
'target': target_name,
202
'status': 'delivered'
203
}
204
self.send_response(sender_id, confirm_msg)
205
else:
206
self.send_error(sender_id, f"Target client '{target_name}' not found")
207
208
def handle_broadcast(self, sender_id, msg_data):
209
"""Broadcast message to all connected clients except sender."""
210
content = msg_data.get('content')
211
sender_name = self.clients[sender_id].get('name', 'unknown')
212
213
broadcast_msg = {
214
'type': 'broadcast',
215
'from': sender_name,
216
'content': content,
217
'timestamp': time.time()
218
}
219
220
sent_count = 0
221
for client_id in self.clients:
222
if client_id != sender_id: # Don't send to sender
223
self.send_response(client_id, broadcast_msg)
224
sent_count += 1
225
226
# Confirm to sender
227
confirm_msg = {
228
'type': 'broadcast_confirmation',
229
'recipients': sent_count
230
}
231
self.send_response(sender_id, confirm_msg)
232
233
def handle_list_clients(self, requester_id):
234
"""Send list of connected clients."""
235
client_list = []
236
for client_id, info in self.clients.items():
237
client_list.append({
238
'name': info.get('name', 'unnamed'),
239
'connected_since': info['first_seen'],
240
'last_activity': info['last_seen'],
241
'message_count': info['message_count']
242
})
243
244
response = {
245
'type': 'client_list',
246
'clients': client_list,
247
'total': len(client_list)
248
}
249
self.send_response(requester_id, response)
250
251
def handle_ping(self, client_id, msg_data):
252
"""Respond to ping with pong."""
253
response = {
254
'type': 'pong',
255
'timestamp': time.time(),
256
'stats': self.message_stats
257
}
258
self.send_response(client_id, response)
259
260
def send_response(self, client_id, response_data):
261
"""Send response to specific client."""
262
message = json.dumps(response_data).encode('utf-8')
263
self.sendMsg(client_id, message)
264
self.message_stats['sent'] += 1
265
266
def send_error(self, client_id, error_message):
267
"""Send error response to client."""
268
error_response = {
269
'type': 'error',
270
'message': error_message,
271
'timestamp': time.time()
272
}
273
self.send_response(client_id, error_response)
274
275
# Start message broker
276
factory = ZmqFactory()
277
factory.registerForShutdown()
278
279
endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")
280
broker = MessageBroker(factory, endpoint)
281
282
print("Message broker listening on tcp://*:5555")
283
reactor.run()
284
```
285
286
### Dealer Connection
287
288
Provides raw DEALER socket functionality for peer-to-peer communication and custom routing scenarios.
289
290
```python { .api }
291
class ZmqDealerConnection(ZmqBase):
292
"""
293
Dealer connection for peer-to-peer messaging.
294
295
Uses ZeroMQ DEALER socket type. Provides raw socket access without
296
built-in message correlation. Suitable for custom protocols and
297
advanced messaging patterns.
298
"""
299
300
socketType = constants.DEALER
301
302
def sendMsg(self, message):
303
"""
304
Send single-part message.
305
306
Args:
307
message (bytes): Message content to send
308
"""
309
310
def sendMultipart(self, parts):
311
"""
312
Send multipart message.
313
314
Args:
315
parts (list): List of message parts (bytes)
316
"""
317
318
def gotMessage(self, *args):
319
"""
320
Abstract method called when message is received.
321
322
Must be implemented by subclasses to handle incoming messages.
323
324
Args:
325
*args: Variable number of message parts (bytes)
326
"""
327
```
328
329
#### Dealer Usage Example
330
331
```python
332
from twisted.internet import reactor, defer
333
from txzmq import ZmqFactory, ZmqEndpoint, ZmqEndpointType, ZmqDealerConnection
334
import json
335
import uuid
336
337
class BrokerClient(ZmqDealerConnection):
338
"""Client connecting to message broker using DEALER socket."""
339
340
def __init__(self, factory, endpoint, client_name):
341
super().__init__(factory, endpoint)
342
self.client_name = client_name
343
self.pending_requests = {} # Track pending requests for correlation
344
self.is_registered = False
345
346
# Register with broker on connection
347
reactor.callWhenRunning(self.register)
348
349
def register(self):
350
"""Register with the message broker."""
351
register_msg = {
352
'type': 'register',
353
'name': self.client_name
354
}
355
self.send_message(register_msg)
356
357
def send_direct_message(self, target_name, content):
358
"""Send direct message to another client."""
359
msg = {
360
'type': 'direct_message',
361
'target': target_name,
362
'content': content
363
}
364
self.send_message(msg)
365
366
def send_broadcast(self, content):
367
"""Broadcast message to all clients."""
368
msg = {
369
'type': 'broadcast',
370
'content': content
371
}
372
self.send_message(msg)
373
374
def list_clients(self):
375
"""Request list of connected clients."""
376
msg = {'type': 'list_clients'}
377
self.send_message(msg)
378
379
def ping_broker(self):
380
"""Send ping to broker."""
381
msg = {'type': 'ping'}
382
self.send_message(msg)
383
384
def send_message(self, msg_data):
385
"""Send message to broker."""
386
message = json.dumps(msg_data).encode('utf-8')
387
self.sendMsg(message)
388
389
def gotMessage(self, *message):
390
"""Handle incoming message from broker."""
391
try:
392
msg_data = json.loads(message[0].decode('utf-8'))
393
msg_type = msg_data.get('type')
394
395
if msg_type == 'registration_ack':
396
self.handle_registration_ack(msg_data)
397
elif msg_type == 'direct_message':
398
self.handle_direct_message(msg_data)
399
elif msg_type == 'broadcast':
400
self.handle_broadcast(msg_data)
401
elif msg_type == 'client_list':
402
self.handle_client_list(msg_data)
403
elif msg_type == 'pong':
404
self.handle_pong(msg_data)
405
elif msg_type == 'delivery_confirmation':
406
self.handle_delivery_confirmation(msg_data)
407
elif msg_type == 'broadcast_confirmation':
408
self.handle_broadcast_confirmation(msg_data)
409
elif msg_type == 'error':
410
self.handle_error(msg_data)
411
else:
412
print(f"Unknown message type: {msg_type}")
413
414
except Exception as e:
415
print(f"Error processing message: {e}")
416
417
def handle_registration_ack(self, msg_data):
418
"""Handle registration acknowledgment."""
419
self.is_registered = True
420
print(f"β {msg_data['message']}")
421
422
def handle_direct_message(self, msg_data):
423
"""Handle incoming direct message."""
424
sender = msg_data['from']
425
content = msg_data['content']
426
timestamp = msg_data['timestamp']
427
print(f"π¬ Direct from {sender}: {content}")
428
429
def handle_broadcast(self, msg_data):
430
"""Handle incoming broadcast message."""
431
sender = msg_data['from']
432
content = msg_data['content']
433
print(f"π’ Broadcast from {sender}: {content}")
434
435
def handle_client_list(self, msg_data):
436
"""Handle client list response."""
437
clients = msg_data['clients']
438
print(f"π₯ Connected clients ({msg_data['total']}):")
439
for client in clients:
440
print(f" - {client['name']} (messages: {client['message_count']})")
441
442
def handle_pong(self, msg_data):
443
"""Handle pong response."""
444
stats = msg_data['stats']
445
print(f"π Pong! Broker stats: {stats}")
446
447
def handle_delivery_confirmation(self, msg_data):
448
"""Handle message delivery confirmation."""
449
target = msg_data['target']
450
status = msg_data['status']
451
print(f"β Message to {target}: {status}")
452
453
def handle_broadcast_confirmation(self, msg_data):
454
"""Handle broadcast confirmation."""
455
count = msg_data['recipients']
456
print(f"β Broadcast sent to {count} recipients")
457
458
def handle_error(self, msg_data):
459
"""Handle error message from broker."""
460
error = msg_data['message']
461
print(f"β Error: {error}")
462
463
# Interactive client example
464
def create_interactive_client(client_name):
465
"""Create an interactive client for testing."""
466
factory = ZmqFactory()
467
endpoint = ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")
468
client = BrokerClient(factory, endpoint, client_name)
469
470
def send_test_messages():
471
if not client.is_registered:
472
reactor.callLater(1.0, send_test_messages)
473
return
474
475
# Send various test messages
476
client.list_clients()
477
client.ping_broker()
478
client.send_broadcast(f"Hello from {client_name}!")
479
480
# Schedule periodic messages
481
reactor.callLater(10.0, send_test_messages)
482
483
reactor.callLater(2.0, send_test_messages)
484
return client
485
486
# Usage: Create multiple clients
487
factory = ZmqFactory()
488
factory.registerForShutdown()
489
490
client1 = create_interactive_client("Alice")
491
client2 = create_interactive_client("Bob")
492
client3 = create_interactive_client("Charlie")
493
494
print("Started 3 clients: Alice, Bob, Charlie")
495
print("They will automatically interact with the broker")
496
reactor.run()
497
```
498
499
### Advanced Routing Patterns
500
501
Complex routing topologies using ROUTER-DEALER combinations for building scalable distributed systems.
502
503
#### Multi-Hop Routing
504
505
```python
506
class RoutingNode(ZmqRouterConnection):
507
"""Intermediate routing node for multi-hop messaging."""
508
509
def __init__(self, factory, bind_endpoint, node_id, upstream_endpoints=None):
510
super().__init__(factory, bind_endpoint)
511
self.node_id = node_id
512
self.routing_table = {} # destination -> next_hop
513
self.upstream_connections = []
514
515
# Connect to upstream nodes
516
if upstream_endpoints:
517
for endpoint in upstream_endpoints:
518
dealer = ZmqDealerConnection(factory, endpoint)
519
dealer.messageReceived = self.handle_upstream_message
520
self.upstream_connections.append(dealer)
521
522
print(f"Routing node {node_id} started")
523
524
def gotMessage(self, sender_id, *message):
525
"""Handle message from downstream clients."""
526
try:
527
msg_data = json.loads(message[0].decode('utf-8'))
528
destination = msg_data.get('destination')
529
530
if destination == self.node_id:
531
# Message for this node
532
self.handle_local_message(sender_id, msg_data)
533
else:
534
# Route message based on routing table
535
self.route_message(sender_id, destination, msg_data)
536
537
except Exception as e:
538
print(f"Routing error: {e}")
539
540
def route_message(self, sender_id, destination, msg_data):
541
"""Route message to appropriate next hop."""
542
if destination in self.routing_table:
543
next_hop = self.routing_table[destination]
544
# Add routing information
545
msg_data['route_path'] = msg_data.get('route_path', []) + [self.node_id]
546
msg_data['original_sender'] = sender_id.decode('utf-8')
547
548
# Forward to next hop
549
forwarded_msg = json.dumps(msg_data).encode('utf-8')
550
if next_hop == 'upstream':
551
# Send via upstream connection
552
for conn in self.upstream_connections:
553
conn.sendMsg(forwarded_msg)
554
else:
555
# Send to local client
556
self.sendMsg(next_hop.encode('utf-8'), forwarded_msg)
557
else:
558
# Unknown destination - send error back
559
error_msg = {
560
'type': 'routing_error',
561
'message': f'Unknown destination: {destination}',
562
'original_message': msg_data
563
}
564
self.sendMsg(sender_id, json.dumps(error_msg).encode('utf-8'))
565
566
def handle_upstream_message(self, message):
567
"""Handle message from upstream routing node."""
568
try:
569
msg_data = json.loads(message[0].decode('utf-8'))
570
destination = msg_data.get('destination')
571
572
if destination in self.routing_table:
573
# Route locally
574
target_id = self.routing_table[destination].encode('utf-8')
575
self.sendMsg(target_id, message[0])
576
else:
577
print(f"Cannot route upstream message to {destination}")
578
579
except Exception as e:
580
print(f"Upstream message handling error: {e}")
581
582
def update_routing_table(self, destination, next_hop):
583
"""Update routing table entry."""
584
self.routing_table[destination] = next_hop
585
print(f"Route updated: {destination} -> {next_hop}")
586
587
# Create hierarchical routing topology
588
factory = ZmqFactory()
589
590
# Top-level router
591
top_router = RoutingNode(
592
factory,
593
ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555"),
594
"top_router"
595
)
596
597
# Regional routers
598
east_router = RoutingNode(
599
factory,
600
ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5556"),
601
"east_router",
602
[ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")]
603
)
604
605
west_router = RoutingNode(
606
factory,
607
ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5557"),
608
"west_router",
609
[ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555")]
610
)
611
612
# Configure routing tables
613
top_router.update_routing_table("east_region", "upstream")
614
top_router.update_routing_table("west_region", "upstream")
615
616
print("Multi-hop routing topology created")
617
```
618
619
### Custom Protocol Implementation
620
621
Building custom messaging protocols using ROUTER-DEALER patterns.
622
623
```python
624
class CustomProtocolRouter(ZmqRouterConnection):
625
"""Custom protocol implementation with message versioning and compression."""
626
627
PROTOCOL_VERSION = "1.0"
628
629
def __init__(self, factory, endpoint):
630
super().__init__(factory, endpoint)
631
self.session_store = {} # client_id -> session_info
632
self.message_handlers = {
633
'HELLO': self.handle_hello,
634
'DATA': self.handle_data,
635
'PING': self.handle_ping,
636
'BYE': self.handle_bye
637
}
638
639
def gotMessage(self, sender_id, *message):
640
"""Handle incoming protocol message."""
641
try:
642
# Parse protocol header
643
if len(message) < 2:
644
self.send_error(sender_id, "Invalid message format")
645
return
646
647
header = json.loads(message[0].decode('utf-8'))
648
payload = message[1]
649
650
# Validate protocol version
651
if header.get('version') != self.PROTOCOL_VERSION:
652
self.send_error(sender_id, "Unsupported protocol version")
653
return
654
655
# Extract message info
656
msg_type = header.get('type')
657
msg_id = header.get('id')
658
compressed = header.get('compressed', False)
659
660
# Decompress payload if needed
661
if compressed:
662
import zlib
663
payload = zlib.decompress(payload)
664
665
# Route to handler
666
if msg_type in self.message_handlers:
667
self.message_handlers[msg_type](sender_id, header, payload, msg_id)
668
else:
669
self.send_error(sender_id, f"Unknown message type: {msg_type}")
670
671
except Exception as e:
672
print(f"Protocol error: {e}")
673
self.send_error(sender_id, f"Protocol error: {e}")
674
675
def handle_hello(self, sender_id, header, payload, msg_id):
676
"""Handle client hello/handshake."""
677
try:
678
hello_data = json.loads(payload.decode('utf-8'))
679
client_name = hello_data.get('name', 'unknown')
680
681
# Create session
682
self.session_store[sender_id] = {
683
'name': client_name,
684
'connected_at': time.time(),
685
'last_ping': time.time(),
686
'message_count': 0
687
}
688
689
# Send welcome response
690
welcome_data = {
691
'status': 'welcome',
692
'server_time': time.time(),
693
'session_id': sender_id.decode('utf-8')
694
}
695
696
self.send_protocol_message(sender_id, 'WELCOME', welcome_data, msg_id)
697
print(f"Client {client_name} connected")
698
699
except Exception as e:
700
self.send_error(sender_id, f"Hello processing error: {e}")
701
702
def handle_data(self, sender_id, header, payload, msg_id):
703
"""Handle data message."""
704
if sender_id not in self.session_store:
705
self.send_error(sender_id, "Not authenticated")
706
return
707
708
# Update session
709
self.session_store[sender_id]['message_count'] += 1
710
self.session_store[sender_id]['last_activity'] = time.time()
711
712
# Process data (echo back with processing info)
713
processed_data = {
714
'original_size': len(payload),
715
'processed_at': time.time(),
716
'message_number': self.session_store[sender_id]['message_count'],
717
'echo': payload.decode('utf-8')[:100] # First 100 chars
718
}
719
720
self.send_protocol_message(sender_id, 'DATA_ACK', processed_data, msg_id)
721
722
def handle_ping(self, sender_id, header, payload, msg_id):
723
"""Handle ping message."""
724
if sender_id in self.session_store:
725
self.session_store[sender_id]['last_ping'] = time.time()
726
727
pong_data = {
728
'server_time': time.time(),
729
'client_count': len(self.session_store)
730
}
731
732
self.send_protocol_message(sender_id, 'PONG', pong_data, msg_id)
733
734
def handle_bye(self, sender_id, header, payload, msg_id):
735
"""Handle client disconnect."""
736
if sender_id in self.session_store:
737
session = self.session_store.pop(sender_id)
738
print(f"Client {session.get('name', 'unknown')} disconnected")
739
740
bye_data = {'status': 'goodbye'}
741
self.send_protocol_message(sender_id, 'BYE_ACK', bye_data, msg_id)
742
743
def send_protocol_message(self, client_id, msg_type, data, reply_to_id=None):
744
"""Send message using custom protocol format."""
745
import uuid
746
747
# Create protocol header
748
header = {
749
'version': self.PROTOCOL_VERSION,
750
'type': msg_type,
751
'id': str(uuid.uuid4()),
752
'timestamp': time.time(),
753
'compressed': False
754
}
755
756
if reply_to_id:
757
header['reply_to'] = reply_to_id
758
759
# Serialize payload
760
payload = json.dumps(data).encode('utf-8')
761
762
# Optional compression for large messages
763
if len(payload) > 1024:
764
import zlib
765
payload = zlib.compress(payload)
766
header['compressed'] = True
767
768
# Send as multipart message
769
header_bytes = json.dumps(header).encode('utf-8')
770
self.sendMultipart(client_id, [header_bytes, payload])
771
772
def send_error(self, client_id, error_message):
773
"""Send error message to client."""
774
error_data = {'error': error_message}
775
self.send_protocol_message(client_id, 'ERROR', error_data)
776
777
# Custom protocol client
778
class CustomProtocolClient(ZmqDealerConnection):
779
"""Client implementing custom protocol."""
780
781
def __init__(self, factory, endpoint, client_name):
782
super().__init__(factory, endpoint)
783
self.client_name = client_name
784
self.connected = False
785
786
# Send hello on connection
787
reactor.callWhenRunning(self.send_hello)
788
789
def send_hello(self):
790
"""Send hello message to server."""
791
hello_data = {'name': self.client_name}
792
self.send_protocol_message('HELLO', hello_data)
793
794
def send_protocol_message(self, msg_type, data):
795
"""Send message using custom protocol."""
796
import uuid
797
798
header = {
799
'version': "1.0",
800
'type': msg_type,
801
'id': str(uuid.uuid4()),
802
'timestamp': time.time(),
803
'compressed': False
804
}
805
806
payload = json.dumps(data).encode('utf-8')
807
header_bytes = json.dumps(header).encode('utf-8')
808
809
self.sendMultipart([header_bytes, payload])
810
811
def gotMessage(self, *message):
812
"""Handle incoming protocol message."""
813
try:
814
header = json.loads(message[0].decode('utf-8'))
815
payload = json.loads(message[1].decode('utf-8'))
816
817
msg_type = header['type']
818
819
if msg_type == 'WELCOME':
820
self.connected = True
821
print(f"β Connected as {self.client_name}")
822
elif msg_type == 'DATA_ACK':
823
print(f"Data processed: {payload}")
824
elif msg_type == 'PONG':
825
print(f"Pong: {payload}")
826
elif msg_type == 'ERROR':
827
print(f"Error: {payload['error']}")
828
829
except Exception as e:
830
print(f"Client protocol error: {e}")
831
832
# Usage
833
factory = ZmqFactory()
834
835
# Start custom protocol server
836
server_endpoint = ZmqEndpoint(ZmqEndpointType.bind, "tcp://*:5555")
837
server = CustomProtocolRouter(factory, server_endpoint)
838
839
# Create clients
840
client1 = CustomProtocolClient(
841
factory,
842
ZmqEndpoint(ZmqEndpointType.connect, "tcp://127.0.0.1:5555"),
843
"TestClient1"
844
)
845
846
print("Custom protocol server and client started")
847
```