0
# Devices
1
2
Background devices for message routing including proxy, queue, forwarder, and steerable proxy implementations with monitoring capabilities.
3
4
## Capabilities
5
6
### Proxy Functions
7
8
Built-in proxy functions for message routing between sockets.
9
10
```python { .api }
11
def proxy(frontend: Socket, backend: Socket, capture: Socket = None) -> None:
12
"""
13
Simple proxy connecting frontend and backend sockets.
14
15
Parameters:
16
- frontend: Frontend socket (e.g., ROUTER for clients)
17
- backend: Backend socket (e.g., DEALER for workers)
18
- capture: Optional socket to capture all messages
19
"""
20
21
def proxy_steerable(frontend: Socket, backend: Socket, capture: Socket = None, control: Socket = None) -> None:
22
"""
23
Steerable proxy with control socket for runtime management.
24
25
Parameters:
26
- frontend: Frontend socket
27
- backend: Backend socket
28
- capture: Optional capture socket
29
- control: Control socket for PAUSE/RESUME/TERMINATE commands
30
"""
31
```
32
33
### Device Base Classes
34
35
Base classes for creating custom background devices.
36
37
```python { .api }
38
class Device:
39
def __init__(self, device_type: int, frontend: Socket, backend: Socket) -> None:
40
"""
41
Base device class.
42
43
Parameters:
44
- device_type: Device type constant (QUEUE, FORWARDER, STREAMER)
45
- frontend: Frontend socket
46
- backend: Backend socket
47
"""
48
49
def run(self) -> None:
50
"""Run the device (blocking)."""
51
52
class ThreadDevice(Device):
53
def __init__(self, device_type: int, frontend: Socket, backend: Socket) -> None:
54
"""Thread-based device that runs in background."""
55
56
def start(self) -> None:
57
"""Start the device in a background thread."""
58
59
def join(self, timeout: float = None) -> None:
60
"""
61
Wait for device thread to terminate.
62
63
Parameters:
64
- timeout: Timeout in seconds (None for infinite)
65
"""
66
67
@property
68
def done(self) -> bool:
69
"""True if device has finished running."""
70
```
71
72
### Monitored Queue Device
73
74
Queue device with monitoring capabilities.
75
76
```python { .api }
77
class MonitoredQueue:
78
def __init__(self, in_socket: Socket, out_socket: Socket, mon_socket: Socket) -> None:
79
"""
80
Create a monitored queue device.
81
82
Parameters:
83
- in_socket: Input socket
84
- out_socket: Output socket
85
- mon_socket: Monitoring socket
86
"""
87
88
def run(self) -> None:
89
"""Run the monitored queue (blocking)."""
90
91
class MonitoredQueueDevice(ThreadDevice):
92
def __init__(self, in_socket: Socket, out_socket: Socket, mon_socket: Socket) -> None:
93
"""Thread-based monitored queue device."""
94
```
95
96
### Proxy Device Classes
97
98
High-level proxy device implementations.
99
100
```python { .api }
101
class ProxyDevice(ThreadDevice):
102
def __init__(self, frontend: Socket, backend: Socket, capture: Socket = None) -> None:
103
"""
104
Proxy device running in background thread.
105
106
Parameters:
107
- frontend: Frontend socket
108
- backend: Backend socket
109
- capture: Optional capture socket
110
"""
111
112
class ProxySteerableDevice(ThreadDevice):
113
def __init__(self, frontend: Socket, backend: Socket, capture: Socket = None, control: Socket = None) -> None:
114
"""
115
Steerable proxy device with control interface.
116
117
Parameters:
118
- frontend: Frontend socket
119
- backend: Backend socket
120
- capture: Optional capture socket
121
- control: Control socket for management
122
"""
123
124
def pause(self) -> None:
125
"""Pause message forwarding."""
126
127
def resume(self) -> None:
128
"""Resume message forwarding."""
129
130
def terminate(self) -> None:
131
"""Terminate the proxy device."""
132
```
133
134
## Usage Examples
135
136
### Basic Proxy
137
138
```python
139
import zmq
140
141
context = zmq.Context()
142
143
# Create frontend for clients
144
frontend = context.socket(zmq.ROUTER)
145
frontend.bind("tcp://*:5559")
146
147
# Create backend for workers
148
backend = context.socket(zmq.DEALER)
149
backend.bind("tcp://*:5560")
150
151
try:
152
# Run proxy (blocking)
153
zmq.proxy(frontend, backend)
154
except KeyboardInterrupt:
155
print("Proxy interrupted")
156
finally:
157
frontend.close()
158
backend.close()
159
context.term()
160
```
161
162
### Proxy with Message Capture
163
164
```python
165
import zmq
166
167
context = zmq.Context()
168
169
# Create sockets
170
frontend = context.socket(zmq.ROUTER)
171
frontend.bind("tcp://*:5559")
172
173
backend = context.socket(zmq.DEALER)
174
backend.bind("tcp://*:5560")
175
176
# Capture socket for monitoring
177
capture = context.socket(zmq.PUB)
178
capture.bind("tcp://*:5561")
179
180
try:
181
# All messages will be captured and published
182
zmq.proxy(frontend, backend, capture)
183
except KeyboardInterrupt:
184
print("Proxy with capture interrupted")
185
finally:
186
frontend.close()
187
backend.close()
188
capture.close()
189
context.term()
190
```
191
192
### Steerable Proxy
193
194
```python
195
import zmq
196
import threading
197
import time
198
199
def control_proxy():
200
"""Control function for steerable proxy"""
201
context = zmq.Context()
202
control = context.socket(zmq.REQ)
203
control.connect("inproc://control")
204
205
time.sleep(2)
206
207
# Pause proxy
208
control.send_string("PAUSE")
209
control.recv_string()
210
print("Proxy paused")
211
212
time.sleep(2)
213
214
# Resume proxy
215
control.send_string("RESUME")
216
control.recv_string()
217
print("Proxy resumed")
218
219
time.sleep(2)
220
221
# Terminate proxy
222
control.send_string("TERMINATE")
223
control.recv_string()
224
print("Proxy terminated")
225
226
control.close()
227
context.term()
228
229
def main():
230
context = zmq.Context()
231
232
# Create proxy sockets
233
frontend = context.socket(zmq.ROUTER)
234
frontend.bind("tcp://*:5559")
235
236
backend = context.socket(zmq.DEALER)
237
backend.bind("tcp://*:5560")
238
239
# Control socket
240
control = context.socket(zmq.REP)
241
control.bind("inproc://control")
242
243
# Start control thread
244
control_thread = threading.Thread(target=control_proxy)
245
control_thread.start()
246
247
try:
248
# Run steerable proxy
249
zmq.proxy_steerable(frontend, backend, None, control)
250
finally:
251
control_thread.join()
252
frontend.close()
253
backend.close()
254
control.close()
255
context.term()
256
257
main()
258
```
259
260
### Thread-based Proxy Device
261
262
```python
263
import zmq
264
from zmq.devices import ProxyDevice
265
import time
266
267
context = zmq.Context()
268
269
# Create sockets
270
frontend = context.socket(zmq.ROUTER)
271
frontend.bind("tcp://*:5559")
272
273
backend = context.socket(zmq.DEALER)
274
backend.bind("tcp://*:5560")
275
276
# Create proxy device
277
device = ProxyDevice(frontend, backend)
278
279
try:
280
# Start proxy in background thread
281
device.start()
282
print("Proxy started in background")
283
284
# Do other work while proxy runs
285
for i in range(10):
286
print(f"Main thread working... {i}")
287
time.sleep(1)
288
289
# Wait for device to complete (won't happen in this example)
290
device.join(timeout=1.0)
291
292
except KeyboardInterrupt:
293
print("Main interrupted")
294
finally:
295
# Device will be cleaned up automatically
296
frontend.close()
297
backend.close()
298
context.term()
299
```
300
301
### Monitored Queue
302
303
```python
304
import zmq
305
from zmq.devices import MonitoredQueueDevice
306
307
context = zmq.Context()
308
309
# Create queue sockets
310
input_socket = context.socket(zmq.PULL)
311
input_socket.bind("tcp://*:5557")
312
313
output_socket = context.socket(zmq.PUSH)
314
output_socket.bind("tcp://*:5558")
315
316
# Monitoring socket
317
monitor_socket = context.socket(zmq.PUB)
318
monitor_socket.bind("tcp://*:5559")
319
320
# Create monitored queue device
321
device = MonitoredQueueDevice(input_socket, output_socket, monitor_socket)
322
323
try:
324
device.start()
325
print("Monitored queue started")
326
327
# Monitor the queue
328
monitor_client = context.socket(zmq.SUB)
329
monitor_client.connect("tcp://localhost:5559")
330
monitor_client.setsockopt(zmq.SUBSCRIBE, b"")
331
332
# Process monitoring messages
333
while True:
334
try:
335
message = monitor_client.recv_string(zmq.NOBLOCK)
336
print(f"Monitor: {message}")
337
except zmq.Again:
338
time.sleep(0.1)
339
340
except KeyboardInterrupt:
341
print("Monitored queue interrupted")
342
finally:
343
device.join()
344
input_socket.close()
345
output_socket.close()
346
monitor_socket.close()
347
monitor_client.close()
348
context.term()
349
```
350
351
### Custom Device
352
353
```python
354
import zmq
355
from zmq.devices import Device
356
import json
357
import time
358
359
class LoggingDevice(Device):
360
"""Custom device that logs all messages"""
361
362
def __init__(self, frontend, backend, log_file="messages.log"):
363
super().__init__(zmq.QUEUE, frontend, backend)
364
self.log_file = log_file
365
366
def run(self):
367
"""Custom run method with logging"""
368
poller = zmq.Poller()
369
poller.register(self.frontend_socket, zmq.POLLIN)
370
poller.register(self.backend_socket, zmq.POLLIN)
371
372
with open(self.log_file, 'w') as log:
373
while True:
374
events = poller.poll()
375
376
for socket, event in events:
377
if socket is self.frontend_socket and event & zmq.POLLIN:
378
# Forward frontend -> backend
379
message = self.frontend_socket.recv_multipart()
380
self.backend_socket.send_multipart(message)
381
382
# Log message
383
log_entry = {
384
'timestamp': time.time(),
385
'direction': 'frontend->backend',
386
'message': [part.decode('utf-8', errors='ignore') for part in message]
387
}
388
log.write(json.dumps(log_entry) + '\n')
389
log.flush()
390
391
elif socket is self.backend_socket and event & zmq.POLLIN:
392
# Forward backend -> frontend
393
message = self.backend_socket.recv_multipart()
394
self.frontend_socket.send_multipart(message)
395
396
# Log message
397
log_entry = {
398
'timestamp': time.time(),
399
'direction': 'backend->frontend',
400
'message': [part.decode('utf-8', errors='ignore') for part in message]
401
}
402
log.write(json.dumps(log_entry) + '\n')
403
log.flush()
404
405
# Usage
406
context = zmq.Context()
407
408
frontend = context.socket(zmq.ROUTER)
409
frontend.bind("tcp://*:5559")
410
411
backend = context.socket(zmq.DEALER)
412
backend.bind("tcp://*:5560")
413
414
device = LoggingDevice(frontend, backend, "proxy.log")
415
416
try:
417
device.run()
418
except KeyboardInterrupt:
419
print("Logging device interrupted")
420
finally:
421
frontend.close()
422
backend.close()
423
context.term()
424
```
425
426
### Load Balancer Device
427
428
```python
429
import zmq
430
from zmq.devices import ThreadDevice
431
import random
432
433
class LoadBalancerDevice(ThreadDevice):
434
"""Load balancer that distributes work across multiple backends"""
435
436
def __init__(self, frontend, backends):
437
# Use first backend as representative
438
super().__init__(zmq.QUEUE, frontend, backends[0])
439
self.backends = backends
440
441
def run(self):
442
"""Custom load balancing logic"""
443
poller = zmq.Poller()
444
poller.register(self.frontend_socket, zmq.POLLIN)
445
446
# Track available backends
447
available_backends = list(self.backends)
448
backend_poller = zmq.Poller()
449
450
for backend in self.backends:
451
backend_poller.register(backend, zmq.POLLIN)
452
453
while True:
454
# Check for frontend requests
455
if poller.poll(10): # 10ms timeout
456
message = self.frontend_socket.recv_multipart()
457
458
# Select available backend
459
if available_backends:
460
backend = random.choice(available_backends)
461
backend.send_multipart(message)
462
print(f"Sent to backend {self.backends.index(backend)}")
463
464
# Check for backend responses
465
backend_events = backend_poller.poll(10)
466
for backend, event in backend_events:
467
if event & zmq.POLLIN:
468
response = backend.recv_multipart()
469
self.frontend_socket.send_multipart(response)
470
print(f"Response from backend {self.backends.index(backend)}")
471
472
# Usage
473
context = zmq.Context()
474
475
frontend = context.socket(zmq.ROUTER)
476
frontend.bind("tcp://*:5559")
477
478
# Multiple backend sockets
479
backends = []
480
for i in range(3):
481
backend = context.socket(zmq.DEALER)
482
backend.bind(f"tcp://*:{5560 + i}")
483
backends.append(backend)
484
485
device = LoadBalancerDevice(frontend, backends)
486
487
try:
488
device.start()
489
print("Load balancer started with 3 backends")
490
491
# Keep main thread alive
492
device.join()
493
494
except KeyboardInterrupt:
495
print("Load balancer interrupted")
496
finally:
497
frontend.close()
498
for backend in backends:
499
backend.close()
500
context.term()
501
```
502
503
### Device with Statistics
504
505
```python
506
import zmq
507
from zmq.devices import Device
508
import time
509
import threading
510
511
class StatisticsDevice(Device):
512
"""Device that tracks message statistics"""
513
514
def __init__(self, frontend, backend):
515
super().__init__(zmq.QUEUE, frontend, backend)
516
self.stats = {
517
'messages_forwarded': 0,
518
'bytes_forwarded': 0,
519
'start_time': time.time(),
520
'last_message_time': 0
521
}
522
self.stats_lock = threading.Lock()
523
524
def get_stats(self):
525
"""Get current statistics"""
526
with self.stats_lock:
527
runtime = time.time() - self.stats['start_time']
528
return {
529
**self.stats,
530
'runtime_seconds': runtime,
531
'messages_per_second': self.stats['messages_forwarded'] / max(runtime, 1),
532
'bytes_per_second': self.stats['bytes_forwarded'] / max(runtime, 1)
533
}
534
535
def run(self):
536
"""Run with statistics tracking"""
537
poller = zmq.Poller()
538
poller.register(self.frontend_socket, zmq.POLLIN)
539
poller.register(self.backend_socket, zmq.POLLIN)
540
541
while True:
542
events = poller.poll()
543
544
for socket, event in events:
545
if event & zmq.POLLIN:
546
message = socket.recv_multipart()
547
548
# Update statistics
549
with self.stats_lock:
550
self.stats['messages_forwarded'] += 1
551
self.stats['bytes_forwarded'] += sum(len(part) for part in message)
552
self.stats['last_message_time'] = time.time()
553
554
# Forward message
555
if socket is self.frontend_socket:
556
self.backend_socket.send_multipart(message)
557
else:
558
self.frontend_socket.send_multipart(message)
559
560
# Usage with statistics reporting
561
def print_stats(device):
562
"""Print statistics periodically"""
563
while True:
564
time.sleep(5)
565
stats = device.get_stats()
566
print(f"Messages: {stats['messages_forwarded']}, "
567
f"Rate: {stats['messages_per_second']:.1f} msg/s, "
568
f"Bytes: {stats['bytes_forwarded']}")
569
570
context = zmq.Context()
571
572
frontend = context.socket(zmq.ROUTER)
573
frontend.bind("tcp://*:5559")
574
575
backend = context.socket(zmq.DEALER)
576
backend.bind("tcp://*:5560")
577
578
device = StatisticsDevice(frontend, backend)
579
580
# Start statistics reporter
581
stats_thread = threading.Thread(target=print_stats, args=(device,))
582
stats_thread.daemon = True
583
stats_thread.start()
584
585
try:
586
device.run()
587
except KeyboardInterrupt:
588
print("\nFinal statistics:")
589
stats = device.get_stats()
590
for key, value in stats.items():
591
print(f" {key}: {value}")
592
finally:
593
frontend.close()
594
backend.close()
595
context.term()
596
```
597
598
## Device Types
599
600
PyZMQ supports several predefined device types:
601
602
```python
603
# Device type constants
604
zmq.QUEUE # Load-balancing queue device
605
zmq.FORWARDER # Message forwarder device
606
zmq.STREAMER # Message streamer device
607
```
608
609
## Control Commands
610
611
Steerable proxy devices accept these control commands:
612
613
```python
614
# Control commands for steerable proxy
615
"PAUSE" # Pause message forwarding
616
"RESUME" # Resume message forwarding
617
"TERMINATE" # Terminate the proxy
618
"STATISTICS" # Get proxy statistics (if supported)
619
```
620
621
## Types
622
623
```python { .api }
624
from typing import Optional, Dict, Any, List
625
import threading
626
627
# Device types
628
DeviceType = int # QUEUE, FORWARDER, STREAMER constants
629
630
# Socket types for devices
631
DeviceSocket = Socket
632
FrontendSocket = Socket
633
BackendSocket = Socket
634
CaptureSocket = Optional[Socket]
635
ControlSocket = Optional[Socket]
636
MonitorSocket = Socket
637
638
# Statistics types
639
DeviceStats = Dict[str, Any]
640
MessageCount = int
641
ByteCount = int
642
Timestamp = float
643
644
# Thread types
645
DeviceThread = threading.Thread
646
JoinTimeout = Optional[float]
647
648
# Control types
649
ControlCommand = str # "PAUSE", "RESUME", "TERMINATE", etc.
650
```