0
# Polling
1
2
High-performance event polling for monitoring multiple sockets simultaneously, with support for timeouts and different polling backends.
3
4
## Capabilities
5
6
### Poller Class
7
8
The Poller class provides efficient event monitoring for multiple sockets using platform-specific polling mechanisms.
9
10
```python { .api }
11
class Poller:
12
def __init__(self) -> None:
13
"""Create a new Poller instance."""
14
15
def register(self, socket: Union[Socket, int], flags: int = POLLIN | POLLOUT) -> None:
16
"""
17
Register a socket for polling.
18
19
Parameters:
20
- socket: Socket or file descriptor to monitor
21
- flags: Event flags to monitor (POLLIN, POLLOUT, POLLERR)
22
"""
23
24
def modify(self, socket: Union[Socket, int], flags: int) -> None:
25
"""
26
Modify polling flags for a registered socket.
27
28
Parameters:
29
- socket: Socket or file descriptor
30
- flags: New event flags
31
"""
32
33
def unregister(self, socket: Union[Socket, int]) -> None:
34
"""
35
Unregister a socket from polling.
36
37
Parameters:
38
- socket: Socket or file descriptor to unregister
39
"""
40
41
def poll(self, timeout: int = -1) -> list[tuple[Socket, int]]:
42
"""
43
Poll for events on registered sockets.
44
45
Parameters:
46
- timeout: Timeout in milliseconds (-1 for infinite, 0 for non-blocking)
47
48
Returns:
49
- list: List of (socket, events) tuples for sockets with events
50
"""
51
```
52
53
### Low-Level Polling
54
55
Direct access to ZMQ's polling functionality for maximum control.
56
57
```python { .api }
58
def zmq_poll(sockets: list[tuple[Socket, int]], timeout: int = -1) -> list[tuple[Socket, int]]:
59
"""
60
Poll sockets for events.
61
62
Parameters:
63
- sockets: List of (socket, flags) tuples to poll
64
- timeout: Timeout in milliseconds (-1 for infinite)
65
66
Returns:
67
- list: List of (socket, events) tuples for sockets with events
68
"""
69
```
70
71
### Select-Style Interface
72
73
Python select-compatible interface for polling sockets.
74
75
```python { .api }
76
def select(rlist: list[Socket], wlist: list[Socket], xlist: list[Socket], timeout: float = None) -> tuple[list[Socket], list[Socket], list[Socket]]:
77
"""
78
Select-style polling interface.
79
80
Parameters:
81
- rlist: Sockets to check for readability
82
- wlist: Sockets to check for writability
83
- xlist: Sockets to check for errors
84
- timeout: Timeout in seconds (None for infinite)
85
86
Returns:
87
- tuple: (readable, writable, error) socket lists
88
"""
89
```
90
91
## Usage Examples
92
93
### Basic Polling
94
95
```python
96
import zmq
97
98
context = zmq.Context()
99
100
# Create multiple sockets
101
frontend = context.socket(zmq.ROUTER)
102
frontend.bind("tcp://*:5555")
103
104
backend = context.socket(zmq.DEALER)
105
backend.bind("tcp://*:5556")
106
107
# Create poller and register sockets
108
poller = zmq.Poller()
109
poller.register(frontend, zmq.POLLIN)
110
poller.register(backend, zmq.POLLIN)
111
112
try:
113
while True:
114
# Poll for events with 1 second timeout
115
events = poller.poll(1000)
116
117
if not events:
118
print("No events in 1 second")
119
continue
120
121
for socket, event in events:
122
if socket is frontend and event & zmq.POLLIN:
123
# Handle frontend message
124
message = frontend.recv_multipart()
125
print(f"Frontend received: {message}")
126
backend.send_multipart(message)
127
128
elif socket is backend and event & zmq.POLLIN:
129
# Handle backend message
130
message = backend.recv_multipart()
131
print(f"Backend received: {message}")
132
frontend.send_multipart(message)
133
134
except KeyboardInterrupt:
135
print("Interrupted")
136
finally:
137
frontend.close()
138
backend.close()
139
context.term()
140
```
141
142
### Proxy with Polling
143
144
```python
145
import zmq
146
147
def create_proxy():
148
context = zmq.Context()
149
150
# Frontend for clients
151
frontend = context.socket(zmq.ROUTER)
152
frontend.bind("tcp://*:5559")
153
154
# Backend for workers
155
backend = context.socket(zmq.DEALER)
156
backend.bind("tcp://*:5560")
157
158
# Control socket for shutdown
159
control = context.socket(zmq.SUB)
160
control.connect("inproc://control")
161
control.setsockopt(zmq.SUBSCRIBE, b"")
162
163
# Poll all sockets
164
poller = zmq.Poller()
165
poller.register(frontend, zmq.POLLIN)
166
poller.register(backend, zmq.POLLIN)
167
poller.register(control, zmq.POLLIN)
168
169
try:
170
while True:
171
events = poller.poll()
172
173
for socket, event in events:
174
if socket is control and event & zmq.POLLIN:
175
# Shutdown signal
176
message = control.recv()
177
if message == b"TERMINATE":
178
return
179
180
elif socket is frontend and event & zmq.POLLIN:
181
# Route frontend to backend
182
message = frontend.recv_multipart()
183
backend.send_multipart(message)
184
185
elif socket is backend and event & zmq.POLLIN:
186
# Route backend to frontend
187
message = backend.recv_multipart()
188
frontend.send_multipart(message)
189
finally:
190
frontend.close()
191
backend.close()
192
control.close()
193
context.term()
194
195
create_proxy()
196
```
197
198
### Non-Blocking Polling
199
200
```python
201
import zmq
202
import time
203
204
context = zmq.Context()
205
socket = context.socket(zmq.SUB)
206
socket.connect("tcp://localhost:5556")
207
socket.setsockopt(zmq.SUBSCRIBE, b"")
208
209
poller = zmq.Poller()
210
poller.register(socket, zmq.POLLIN)
211
212
try:
213
while True:
214
# Non-blocking poll (timeout = 0)
215
events = poller.poll(0)
216
217
if events:
218
# Process available messages
219
for sock, event in events:
220
if event & zmq.POLLIN:
221
message = sock.recv_string()
222
print(f"Received: {message}")
223
else:
224
# No messages available, do other work
225
print("No messages, doing other work...")
226
time.sleep(0.1)
227
228
except KeyboardInterrupt:
229
print("Interrupted")
230
finally:
231
socket.close()
232
context.term()
233
```
234
235
### Mixed Socket Types
236
237
```python
238
import zmq
239
import socket as py_socket
240
241
context = zmq.Context()
242
243
# ZMQ socket
244
zmq_socket = context.socket(zmq.SUB)
245
zmq_socket.connect("tcp://localhost:5556")
246
zmq_socket.setsockopt(zmq.SUBSCRIBE, b"")
247
248
# Regular Python socket
249
tcp_socket = py_socket.socket(py_socket.AF_INET, py_socket.SOCK_STREAM)
250
tcp_socket.bind(("localhost", 8080))
251
tcp_socket.listen(5)
252
tcp_socket.setblocking(False)
253
254
# Poll both socket types
255
poller = zmq.Poller()
256
poller.register(zmq_socket, zmq.POLLIN)
257
poller.register(tcp_socket, zmq.POLLIN) # Can register Python sockets too
258
259
try:
260
while True:
261
events = poller.poll(1000)
262
263
for sock, event in events:
264
if sock is zmq_socket and event & zmq.POLLIN:
265
# Handle ZMQ message
266
message = zmq_socket.recv_string()
267
print(f"ZMQ message: {message}")
268
269
elif sock is tcp_socket and event & zmq.POLLIN:
270
# Handle TCP connection
271
try:
272
client_sock, addr = tcp_socket.accept()
273
print(f"TCP connection from {addr}")
274
client_sock.close()
275
except BlockingIOError:
276
pass
277
278
except KeyboardInterrupt:
279
print("Interrupted")
280
finally:
281
zmq_socket.close()
282
tcp_socket.close()
283
context.term()
284
```
285
286
### Select-Style Polling
287
288
```python
289
import zmq
290
291
context = zmq.Context()
292
293
# Create sockets
294
pub = context.socket(zmq.PUB)
295
pub.bind("tcp://*:5557")
296
297
sub1 = context.socket(zmq.SUB)
298
sub1.connect("tcp://localhost:5557")
299
sub1.setsockopt(zmq.SUBSCRIBE, b"")
300
301
sub2 = context.socket(zmq.SUB)
302
sub2.connect("tcp://localhost:5557")
303
sub2.setsockopt(zmq.SUBSCRIBE, b"")
304
305
# Send some messages
306
pub.send_string("Hello 1")
307
pub.send_string("Hello 2")
308
309
# Use select-style polling
310
readable, writable, error = zmq.select([sub1, sub2], [pub], [], timeout=1.0)
311
312
print(f"Readable sockets: {len(readable)}")
313
print(f"Writable sockets: {len(writable)}")
314
print(f"Error sockets: {len(error)}")
315
316
# Process readable sockets
317
for sock in readable:
318
message = sock.recv_string()
319
print(f"Received: {message}")
320
321
# Clean up
322
pub.close()
323
sub1.close()
324
sub2.close()
325
context.term()
326
```
327
328
### Timeout Handling
329
330
```python
331
import zmq
332
import time
333
334
context = zmq.Context()
335
socket = context.socket(zmq.REQ)
336
socket.connect("tcp://localhost:5555")
337
338
poller = zmq.Poller()
339
poller.register(socket, zmq.POLLIN)
340
341
# Send request
342
socket.send_string("Hello Server")
343
start_time = time.time()
344
345
try:
346
# Poll with timeout
347
events = poller.poll(5000) # 5 second timeout
348
349
if events:
350
# Response received
351
response = socket.recv_string()
352
elapsed = time.time() - start_time
353
print(f"Response: {response} (took {elapsed:.2f}s)")
354
else:
355
# Timeout occurred
356
print("Request timed out after 5 seconds")
357
358
# Socket is now in inconsistent state, need to recreate
359
socket.close()
360
socket = context.socket(zmq.REQ)
361
socket.connect("tcp://localhost:5555")
362
363
except KeyboardInterrupt:
364
print("Interrupted")
365
finally:
366
socket.close()
367
context.term()
368
```
369
370
### Event Flag Combinations
371
372
```python
373
import zmq
374
375
context = zmq.Context()
376
socket = context.socket(zmq.DEALER)
377
socket.connect("tcp://localhost:5555")
378
379
poller = zmq.Poller()
380
381
# Register for different event combinations
382
poller.register(socket, zmq.POLLIN) # Read only
383
# poller.register(socket, zmq.POLLOUT) # Write only
384
# poller.register(socket, zmq.POLLIN | zmq.POLLOUT) # Read and write
385
# poller.register(socket, zmq.POLLERR) # Error only
386
387
try:
388
while True:
389
events = poller.poll(1000)
390
391
for sock, event in events:
392
if event & zmq.POLLIN:
393
print("Socket is readable")
394
message = sock.recv_string()
395
print(f"Received: {message}")
396
397
if event & zmq.POLLOUT:
398
print("Socket is writable")
399
sock.send_string("Response")
400
401
if event & zmq.POLLERR:
402
print("Socket has error")
403
break
404
405
except KeyboardInterrupt:
406
print("Interrupted")
407
finally:
408
socket.close()
409
context.term()
410
```
411
412
### Dynamic Socket Registration
413
414
```python
415
import zmq
416
import random
417
418
context = zmq.Context()
419
poller = zmq.Poller()
420
421
# Track active sockets
422
sockets = {}
423
424
def add_socket(name):
425
"""Add a new socket to polling"""
426
socket = context.socket(zmq.SUB)
427
socket.connect("tcp://localhost:5556")
428
socket.setsockopt(zmq.SUBSCRIBE, name.encode())
429
430
sockets[name] = socket
431
poller.register(socket, zmq.POLLIN)
432
print(f"Added socket: {name}")
433
434
def remove_socket(name):
435
"""Remove socket from polling"""
436
if name in sockets:
437
socket = sockets[name]
438
poller.unregister(socket)
439
socket.close()
440
del sockets[name]
441
print(f"Removed socket: {name}")
442
443
try:
444
# Start with some sockets
445
add_socket("weather")
446
add_socket("news")
447
448
while True:
449
events = poller.poll(1000)
450
451
if events:
452
for socket, event in events:
453
if event & zmq.POLLIN:
454
message = socket.recv_string()
455
print(f"Received: {message}")
456
else:
457
# Randomly add/remove sockets for demonstration
458
action = random.choice(["add", "remove", "nothing"])
459
460
if action == "add" and len(sockets) < 5:
461
name = f"topic_{random.randint(1, 100)}"
462
if name not in sockets:
463
add_socket(name)
464
465
elif action == "remove" and len(sockets) > 1:
466
name = random.choice(list(sockets.keys()))
467
remove_socket(name)
468
469
except KeyboardInterrupt:
470
print("Interrupted")
471
finally:
472
# Clean up all sockets
473
for socket in sockets.values():
474
socket.close()
475
context.term()
476
```
477
478
## Performance Tips
479
480
### Efficient Polling Patterns
481
482
```python
483
import zmq
484
485
# Reuse poller instance
486
poller = zmq.Poller()
487
488
# Register sockets once
489
for socket in sockets:
490
poller.register(socket, zmq.POLLIN)
491
492
# Poll in tight loop
493
while True:
494
# Use appropriate timeout
495
events = poller.poll(100) # Short timeout for responsiveness
496
497
if not events:
498
continue
499
500
# Process events efficiently
501
for socket, event in events:
502
# Handle events without blocking
503
if event & zmq.POLLIN:
504
message = socket.recv(zmq.NOBLOCK)
505
```
506
507
### Memory Management
508
509
```python
510
import zmq
511
512
# Unregister sockets before closing
513
poller.unregister(socket)
514
socket.close()
515
516
# Reuse poller instances when possible
517
# Creating new pollers frequently can be expensive
518
```
519
520
## Types
521
522
```python { .api }
523
from typing import Union, List, Tuple, Optional
524
525
# Socket types for polling
526
PollSocket = Union[Socket, int] # Socket or file descriptor
527
528
# Event flags
529
EventFlags = int # Combination of POLLIN, POLLOUT, POLLERR
530
531
# Poll results
532
PollEvent = Tuple[Socket, int] # (socket, events)
533
PollResult = List[PollEvent]
534
535
# Select-style results
536
SelectResult = Tuple[List[Socket], List[Socket], List[Socket]] # (readable, writable, error)
537
538
# Timeout types
539
PollTimeout = int # Milliseconds (-1 for infinite)
540
SelectTimeout = Optional[float] # Seconds (None for infinite)
541
```