0
# Error Handling
1
2
Exception hierarchy for ZMQ-specific errors with errno mapping, context information, and specialized exceptions for different error conditions.
3
4
## Capabilities
5
6
### Base Exception Classes
7
8
Foundation exception classes for ZMQ error handling.
9
10
```python { .api }
11
class ZMQBaseError(Exception):
12
"""Base exception class for all ZMQ errors in Python."""
13
14
class ZMQError(ZMQBaseError):
15
"""
16
Standard ZMQ error wrapping errno-style errors.
17
18
Attributes:
19
- errno: ZMQ error code (int or None)
20
- strerror: Human-readable error description (str)
21
"""
22
23
def __init__(self, errno: int = None, msg: str = None) -> None:
24
"""
25
Create a ZMQ error.
26
27
Parameters:
28
- errno: ZMQ errno code (None to use zmq_errno())
29
- msg: Custom error message (None for default)
30
"""
31
32
@property
33
def errno(self) -> int:
34
"""ZMQ error code."""
35
36
@property
37
def strerror(self) -> str:
38
"""Human-readable error message."""
39
```
40
41
### Connection Errors
42
43
Specialized exceptions for connection and binding issues.
44
45
```python { .api }
46
class ZMQBindError(ZMQError):
47
"""
48
Error raised when bind() fails.
49
50
Includes additional context about the binding attempt.
51
"""
52
53
def __init__(self, errno: int, msg: str, address: str) -> None:
54
"""
55
Create a bind error.
56
57
Parameters:
58
- errno: ZMQ error code
59
- msg: Error message
60
- address: Address that failed to bind
61
"""
62
63
@property
64
def address(self) -> str:
65
"""Address that failed to bind."""
66
```
67
68
### Version Errors
69
70
Exceptions related to version compatibility issues.
71
72
```python { .api }
73
class ZMQVersionError(ZMQError):
74
"""
75
Error raised when ZMQ version requirements are not met.
76
"""
77
78
def __init__(self, min_version: tuple, actual_version: tuple, msg: str = None) -> None:
79
"""
80
Create a version error.
81
82
Parameters:
83
- min_version: Minimum required version tuple
84
- actual_version: Actual ZMQ version tuple
85
- msg: Custom error message
86
"""
87
88
@property
89
def min_version(self) -> tuple:
90
"""Minimum required version."""
91
92
@property
93
def actual_version(self) -> tuple:
94
"""Actual ZMQ version."""
95
```
96
97
### Async Errors
98
99
Exceptions specific to asynchronous operations.
100
101
```python { .api }
102
class NotDone(ZMQError):
103
"""
104
Error raised when async operation is not yet complete.
105
106
Used with MessageTracker and Future objects.
107
"""
108
109
class Again(ZMQError):
110
"""
111
Error raised for EAGAIN errno (would block).
112
113
Commonly raised with NOBLOCK flag operations.
114
"""
115
```
116
117
### Security Errors
118
119
Exceptions related to authentication and security.
120
121
```python { .api }
122
class ZMQAuthError(ZMQError):
123
"""
124
Base class for authentication-related errors.
125
"""
126
127
class ZAPError(ZMQAuthError):
128
"""
129
Error in ZAP (ZMQ Authentication Protocol) processing.
130
"""
131
132
def __init__(self, status_code: str, status_text: str = None) -> None:
133
"""
134
Create a ZAP error.
135
136
Parameters:
137
- status_code: ZAP status code
138
- status_text: Human-readable status description
139
"""
140
141
@property
142
def status_code(self) -> str:
143
"""ZAP status code."""
144
145
@property
146
def status_text(self) -> str:
147
"""ZAP status description."""
148
```
149
150
### Context Errors
151
152
Exceptions related to context operations.
153
154
```python { .api }
155
class ContextTerminated(ZMQError):
156
"""
157
Error raised when operations are attempted on terminated context.
158
"""
159
160
class InterruptedSystemCall(ZMQError):
161
"""
162
Error raised when ZMQ call is interrupted by system signal.
163
164
Equivalent to EINTR errno.
165
"""
166
```
167
168
### Message Errors
169
170
Exceptions related to message handling.
171
172
```python { .api }
173
class MessageError(ZMQError):
174
"""
175
Base class for message-related errors.
176
"""
177
178
class MessageTooLong(MessageError):
179
"""
180
Error raised when message exceeds size limits.
181
182
Corresponds to EMSGSIZE errno.
183
"""
184
185
class InvalidMessage(MessageError):
186
"""
187
Error raised for malformed or invalid messages.
188
"""
189
```
190
191
### Warning Classes
192
193
Warning categories for non-fatal issues.
194
195
```python { .api }
196
class DraftFDWarning(RuntimeWarning):
197
"""
198
Warning for using experimental FD support on draft sockets.
199
200
Used when socket.FD is accessed on thread-safe sockets.
201
"""
202
203
def __init__(self, msg: str = "") -> None:
204
"""
205
Create a draft FD warning.
206
207
Parameters:
208
- msg: Custom warning message
209
"""
210
```
211
212
## Error Code Mapping
213
214
PyZMQ maps libzmq error codes to Python exceptions:
215
216
```python { .api }
217
# Common errno mappings
218
EAGAIN -> Again # Resource temporarily unavailable
219
EINTR -> InterruptedSystemCall # Interrupted system call
220
ETERM -> ContextTerminated # Context terminated
221
ENOTSUP -> ZMQError # Operation not supported
222
EINVAL -> ZMQError # Invalid argument
223
EMSGSIZE -> MessageTooLong # Message too long
224
EFSM -> ZMQError # Finite state machine error
225
```
226
227
## Usage Examples
228
229
### Basic Error Handling
230
231
```python
232
import zmq
233
234
context = zmq.Context()
235
socket = context.socket(zmq.REQ)
236
237
try:
238
# This will raise ZMQError (no connection)
239
socket.recv(zmq.NOBLOCK)
240
except zmq.Again:
241
print("No message available (would block)")
242
except zmq.ZMQError as e:
243
print(f"ZMQ error {e.errno}: {e.strerror}")
244
finally:
245
socket.close()
246
context.term()
247
```
248
249
### Specific Error Types
250
251
```python
252
import zmq
253
254
context = zmq.Context()
255
socket = context.socket(zmq.REP)
256
257
try:
258
# This may fail if address is already in use
259
socket.bind("tcp://*:5555")
260
except zmq.ZMQBindError as e:
261
print(f"Failed to bind to {e.address}: {e.strerror}")
262
except zmq.ZMQError as e:
263
print(f"Other ZMQ error: {e}")
264
finally:
265
socket.close()
266
context.term()
267
```
268
269
### Non-Blocking Operations
270
271
```python
272
import zmq
273
274
context = zmq.Context()
275
socket = context.socket(zmq.PULL)
276
socket.connect("tcp://localhost:5555")
277
278
while True:
279
try:
280
# Non-blocking receive
281
message = socket.recv(zmq.NOBLOCK)
282
print(f"Received: {message}")
283
except zmq.Again:
284
# No message available, do other work
285
print("No message, continuing...")
286
time.sleep(0.1)
287
except zmq.ZMQError as e:
288
print(f"ZMQ error: {e}")
289
break
290
291
socket.close()
292
context.term()
293
```
294
295
### Context Termination Handling
296
297
```python
298
import zmq
299
import threading
300
import time
301
302
def worker():
303
"""Worker thread that handles context termination"""
304
try:
305
socket = context.socket(zmq.PULL)
306
socket.connect("inproc://work")
307
308
while True:
309
message = socket.recv()
310
print(f"Processing: {message}")
311
except zmq.ContextTerminated:
312
print("Context terminated, worker exiting")
313
except zmq.ZMQError as e:
314
print(f"Worker error: {e}")
315
finally:
316
socket.close()
317
318
context = zmq.Context()
319
320
# Start worker thread
321
worker_thread = threading.Thread(target=worker)
322
worker_thread.start()
323
324
try:
325
time.sleep(2)
326
print("Terminating context...")
327
context.term() # This will cause ContextTerminated in worker
328
329
worker_thread.join()
330
print("Worker thread finished")
331
except KeyboardInterrupt:
332
print("Interrupted")
333
context.term()
334
worker_thread.join()
335
```
336
337
### Version Checking with Error Handling
338
339
```python
340
import zmq
341
342
def require_zmq_version(min_version):
343
"""Ensure minimum ZMQ version or raise error"""
344
actual = zmq.zmq_version_info()
345
346
if actual < min_version:
347
raise zmq.ZMQVersionError(
348
min_version=min_version,
349
actual_version=actual,
350
msg=f"Requires ZMQ {min_version}, got {actual}"
351
)
352
353
try:
354
# Require ZMQ 4.2.0 or later
355
require_zmq_version((4, 2, 0))
356
print("ZMQ version OK")
357
except zmq.ZMQVersionError as e:
358
print(f"Version error: {e}")
359
print(f"Required: {e.min_version}")
360
print(f"Actual: {e.actual_version}")
361
```
362
363
### Message Tracking Error Handling
364
365
```python
366
import zmq
367
368
context = zmq.Context()
369
socket = context.socket(zmq.PUSH)
370
socket.bind("tcp://*:5555")
371
372
try:
373
# Send with tracking
374
tracker = socket.send(b"Large message", track=True)
375
376
# Wait for completion
377
if not tracker.wait(timeout=5000):
378
print("Send timeout")
379
else:
380
print("Message sent successfully")
381
382
except zmq.NotDone:
383
print("Message sending not complete")
384
except zmq.ZMQError as e:
385
print(f"Send error: {e}")
386
finally:
387
socket.close()
388
context.term()
389
```
390
391
### Authentication Error Handling
392
393
```python
394
import zmq
395
from zmq.auth import ThreadAuthenticator
396
397
context = zmq.Context()
398
399
try:
400
# Server with authentication
401
auth = ThreadAuthenticator(context)
402
auth.start()
403
404
passwords = {'user': 'secret'}
405
auth.configure_plain(domain='*', passwords=passwords)
406
407
server = context.socket(zmq.REP)
408
server.set(zmq.PLAIN_SERVER, 1)
409
server.bind("tcp://*:5555")
410
411
# This may raise authentication errors
412
message = server.recv()
413
server.send(b"Authenticated response")
414
415
except zmq.ZMQAuthError as e:
416
print(f"Authentication error: {e}")
417
except zmq.ZMQError as e:
418
print(f"ZMQ error: {e}")
419
finally:
420
if 'server' in locals():
421
server.close()
422
if 'auth' in locals():
423
auth.stop()
424
context.term()
425
```
426
427
### Error Logging and Debugging
428
429
```python
430
import zmq
431
import logging
432
433
# Configure logging
434
logging.basicConfig(level=logging.DEBUG)
435
logger = logging.getLogger(__name__)
436
437
context = zmq.Context()
438
socket = context.socket(zmq.REQ)
439
440
try:
441
socket.connect("tcp://localhost:5555")
442
socket.send_string("Hello")
443
444
# Set receive timeout
445
socket.setsockopt(zmq.RCVTIMEO, 5000)
446
response = socket.recv_string()
447
448
except zmq.Again:
449
logger.warning("Receive timeout - no response from server")
450
except zmq.ZMQError as e:
451
logger.error(f"ZMQ error {e.errno}: {e.strerror}")
452
logger.debug(f"Socket state: {socket.getsockopt(zmq.EVENTS)}")
453
except Exception as e:
454
logger.exception(f"Unexpected error: {e}")
455
finally:
456
socket.close()
457
context.term()
458
```
459
460
### Retry Logic with Error Handling
461
462
```python
463
import zmq
464
import time
465
import random
466
467
def send_with_retry(socket, message, max_retries=3):
468
"""Send message with exponential backoff retry"""
469
470
for attempt in range(max_retries + 1):
471
try:
472
socket.send_string(message, zmq.NOBLOCK)
473
return True
474
475
except zmq.Again:
476
if attempt == max_retries:
477
raise zmq.ZMQError(zmq.EAGAIN, "Max retries exceeded")
478
479
# Exponential backoff with jitter
480
delay = (2 ** attempt) + random.uniform(0, 1)
481
print(f"Send failed, retrying in {delay:.2f}s...")
482
time.sleep(delay)
483
484
except zmq.ZMQError as e:
485
print(f"Non-recoverable error: {e}")
486
raise
487
488
context = zmq.Context()
489
socket = context.socket(zmq.PUSH)
490
socket.setsockopt(zmq.SNDHWM, 1) # Low HWM to trigger blocking
491
492
try:
493
socket.bind("tcp://*:5555")
494
495
# This may block/fail due to low HWM
496
send_with_retry(socket, "Test message")
497
print("Message sent successfully")
498
499
except zmq.ZMQError as e:
500
print(f"Failed to send: {e}")
501
finally:
502
socket.close()
503
context.term()
504
```
505
506
### Custom Error Context
507
508
```python
509
import zmq
510
511
class ZMQOperationError(zmq.ZMQError):
512
"""Custom error with operation context"""
513
514
def __init__(self, operation, errno=None, msg=None):
515
super().__init__(errno, msg)
516
self.operation = operation
517
518
def __str__(self):
519
return f"{self.operation} failed: {self.strerror}"
520
521
def safe_socket_operation(socket, operation, *args, **kwargs):
522
"""Wrapper for socket operations with enhanced error context"""
523
524
try:
525
method = getattr(socket, operation)
526
return method(*args, **kwargs)
527
528
except zmq.ZMQError as e:
529
# Re-raise with operation context
530
raise ZMQOperationError(
531
operation=operation,
532
errno=e.errno,
533
msg=f"Operation '{operation}' failed: {e.strerror}"
534
) from e
535
536
context = zmq.Context()
537
socket = context.socket(zmq.REQ)
538
539
try:
540
safe_socket_operation(socket, 'connect', "tcp://localhost:5555")
541
safe_socket_operation(socket, 'send_string', "Hello")
542
response = safe_socket_operation(socket, 'recv_string')
543
print(f"Response: {response}")
544
545
except ZMQOperationError as e:
546
print(f"Operation error: {e}")
547
print(f"Failed operation: {e.operation}")
548
except zmq.ZMQError as e:
549
print(f"ZMQ error: {e}")
550
finally:
551
socket.close()
552
context.term()
553
```
554
555
### Error Recovery Patterns
556
557
```python
558
import zmq
559
import time
560
561
class RobustSocket:
562
"""Socket wrapper with automatic error recovery"""
563
564
def __init__(self, context, socket_type, address):
565
self.context = context
566
self.socket_type = socket_type
567
self.address = address
568
self.socket = None
569
self._connect()
570
571
def _connect(self):
572
"""Create and connect socket"""
573
if self.socket:
574
self.socket.close()
575
576
self.socket = self.context.socket(self.socket_type)
577
self.socket.setsockopt(zmq.LINGER, 0)
578
self.socket.connect(self.address)
579
580
def send_reliable(self, message, timeout=5000):
581
"""Send with automatic recovery on failure"""
582
583
for attempt in range(3):
584
try:
585
self.socket.send_string(message, zmq.NOBLOCK)
586
return True
587
588
except zmq.Again:
589
# Socket may be blocked, try recreating
590
print(f"Send blocked, recreating socket (attempt {attempt + 1})")
591
self._connect()
592
continue
593
594
except zmq.ZMQError as e:
595
if e.errno == zmq.ETERM:
596
print("Context terminated")
597
return False
598
elif attempt == 2:
599
raise
600
else:
601
print(f"ZMQ error, retrying: {e}")
602
self._connect()
603
time.sleep(0.1)
604
605
return False
606
607
def close(self):
608
if self.socket:
609
self.socket.close()
610
611
# Usage
612
context = zmq.Context()
613
614
try:
615
robust_socket = RobustSocket(context, zmq.REQ, "tcp://localhost:5555")
616
617
success = robust_socket.send_reliable("Test message")
618
if success:
619
print("Message sent successfully")
620
else:
621
print("Failed to send message")
622
623
except zmq.ZMQError as e:
624
print(f"Unrecoverable error: {e}")
625
finally:
626
robust_socket.close()
627
context.term()
628
```
629
630
## Error Code Reference
631
632
Common ZMQ error codes and their meanings:
633
634
```python
635
import zmq
636
637
# Network errors
638
zmq.EADDRINUSE # Address already in use
639
zmq.EADDRNOTAVAIL # Cannot assign requested address
640
zmq.ECONNREFUSED # Connection refused
641
zmq.ENETUNREACH # Network is unreachable
642
zmq.ETIMEDOUT # Connection timed out
643
644
# State errors
645
zmq.EFSM # Operation not valid in current state
646
zmq.ETERM # Context was terminated
647
zmq.EAGAIN # Resource temporarily unavailable
648
649
# Configuration errors
650
zmq.EINVAL # Invalid argument
651
zmq.ENOTSUP # Operation not supported
652
zmq.EMSGSIZE # Message too long
653
654
# System errors
655
zmq.EINTR # Interrupted system call
656
zmq.ENOMEM # Cannot allocate memory
657
zmq.EFAULT # Bad address
658
```
659
660
## Types
661
662
```python { .api }
663
from typing import Optional, Union, Tuple
664
import errno
665
666
# Error types
667
ErrorCode = int
668
ErrorMessage = str
669
ZMQErrorNumber = Union[int, None]
670
671
# Version types
672
VersionTuple = Tuple[int, int, int]
673
674
# Address types
675
BindAddress = str
676
ConnectAddress = str
677
678
# Authentication types
679
ZAPStatusCode = str
680
ZAPStatusText = str
681
682
# Message types
683
MessageContent = Union[bytes, str]
684
MessageSize = int
685
686
# Context types
687
OperationName = str
688
OperationContext = dict
689
```