0
# Error Handling
1
2
Comprehensive error handling with custom exceptions, error policies, and retry mechanisms for robust messaging applications that can gracefully handle network issues, authentication failures, and protocol errors.
3
4
## Capabilities
5
6
### Error Policy Configuration
7
8
Configurable error handling policies that define how clients should respond to different types of errors.
9
10
```python { .api }
11
class ErrorPolicy:
12
def __init__(self, max_retries=3, on_error=None):
13
"""
14
Error handling policy configuration.
15
16
Parameters:
17
- max_retries (int): Maximum number of retry attempts
18
- on_error (ErrorAction): Default error action to take
19
"""
20
21
def on_unrecognized_error(self, error):
22
"""Handle unrecognized errors."""
23
24
def on_message_error(self, error):
25
"""Handle message-level errors."""
26
27
def on_link_error(self, error):
28
"""Handle link-level errors."""
29
30
def on_connection_error(self, error):
31
"""Handle connection-level errors."""
32
```
33
34
### Error Actions
35
36
Configuration for specific error handling actions including retry behavior and backoff strategies.
37
38
```python { .api }
39
class ErrorAction:
40
def __init__(self, retry=True, backoff=1.0, increment_retries=True):
41
"""
42
Error action specification.
43
44
Parameters:
45
- retry (bool): Whether to retry the operation
46
- backoff (float): Backoff delay in seconds
47
- increment_retries (bool): Whether to increment retry counter
48
"""
49
50
@property
51
def retry: bool
52
"""Whether to retry the failed operation."""
53
54
@property
55
def backoff: float
56
"""Backoff delay before retry in seconds."""
57
58
@property
59
def increment_retries: bool
60
"""Whether to count this as a retry attempt."""
61
```
62
63
**Usage Example:**
64
65
```python
66
from uamqp.errors import ErrorPolicy, ErrorAction
67
68
# Custom error policy with exponential backoff
69
def create_retry_action(attempt):
70
backoff_delay = min(2.0 ** attempt, 60.0) # Cap at 60 seconds
71
return ErrorAction(retry=True, backoff=backoff_delay)
72
73
error_policy = ErrorPolicy(
74
max_retries=5,
75
on_error=ErrorAction(retry=True, backoff=1.0)
76
)
77
78
# Use with client
79
from uamqp import SendClient
80
client = SendClient(target, auth=auth, error_policy=error_policy)
81
```
82
83
## Exception Hierarchy
84
85
### Base AMQP Errors
86
87
Root exception classes for AMQP-related errors.
88
89
```python { .api }
90
class AMQPError(Exception):
91
"""Base exception for all AMQP-related errors."""
92
93
class AMQPConnectionError(AMQPError):
94
"""Connection-related errors including network and protocol issues."""
95
96
class AMQPClientShutdown(AMQPError):
97
"""Exception raised when client is shutting down."""
98
99
class MessageHandlerError(AMQPError):
100
"""Errors in message handling and processing."""
101
```
102
103
### Connection Errors
104
105
Errors related to AMQP connection management and lifecycle.
106
107
```python { .api }
108
class ConnectionClose(AMQPConnectionError):
109
"""Connection was closed by the broker."""
110
111
class VendorConnectionClose(ConnectionClose):
112
"""Vendor-specific connection closure."""
113
```
114
115
### Link Errors
116
117
Errors related to AMQP links (message senders and receivers).
118
119
```python { .api }
120
class LinkDetach(AMQPError):
121
"""Link was detached by remote peer."""
122
123
class VendorLinkDetach(LinkDetach):
124
"""Vendor-specific link detachment."""
125
126
class LinkRedirect(AMQPError):
127
"""Link redirected to different endpoint."""
128
```
129
130
### Authentication Errors
131
132
Errors related to authentication and authorization.
133
134
```python { .api }
135
class AuthenticationException(AMQPError):
136
"""General authentication failure."""
137
138
class TokenExpired(AuthenticationException):
139
"""Authentication token has expired."""
140
141
class TokenAuthFailure(AuthenticationException):
142
"""Token authentication specifically failed."""
143
```
144
145
### Message Errors
146
147
Errors related to message processing and state management.
148
149
```python { .api }
150
class MessageResponse(AMQPError):
151
"""Base class for message response errors."""
152
153
class MessageException(MessageResponse):
154
"""General message processing error."""
155
156
class MessageSendFailed(MessageException):
157
"""Message sending operation failed."""
158
159
class MessageContentTooLarge(MessageException):
160
"""Message exceeds maximum size limit."""
161
```
162
163
### Message Settlement Responses
164
165
Specific responses to message settlement that may be treated as exceptions.
166
167
```python { .api }
168
class MessageAlreadySettled(MessageResponse):
169
"""Attempt to settle already-settled message."""
170
171
class MessageAccepted(MessageResponse):
172
"""Message was accepted by receiver."""
173
174
class MessageRejected(MessageResponse):
175
"""Message was rejected by receiver."""
176
177
class MessageReleased(MessageResponse):
178
"""Message was released by receiver."""
179
180
class MessageModified(MessageResponse):
181
"""Message was modified by receiver."""
182
```
183
184
### Timeout Errors
185
186
Errors related to operation timeouts.
187
188
```python { .api }
189
class ClientTimeout(AMQPError):
190
"""Client operation timed out."""
191
```
192
193
## Error Handling Patterns
194
195
### Basic Exception Handling
196
197
Handle common AMQP errors with appropriate recovery strategies.
198
199
```python
200
from uamqp import send_message, receive_message
201
from uamqp.errors import (
202
AMQPConnectionError,
203
AuthenticationException,
204
MessageSendFailed,
205
ClientTimeout
206
)
207
208
def robust_send_message(target, message, auth):
209
max_attempts = 3
210
211
for attempt in range(max_attempts):
212
try:
213
result = send_message(target, message, auth=auth)
214
print(f"Message sent successfully: {result}")
215
return result
216
217
except AMQPConnectionError as e:
218
print(f"Connection error (attempt {attempt + 1}): {e}")
219
if attempt == max_attempts - 1:
220
raise
221
time.sleep(2 ** attempt) # Exponential backoff
222
223
except AuthenticationException as e:
224
print(f"Authentication failed: {e}")
225
# Don't retry auth failures
226
raise
227
228
except MessageSendFailed as e:
229
print(f"Send failed (attempt {attempt + 1}): {e}")
230
if attempt == max_attempts - 1:
231
raise
232
time.sleep(1)
233
234
except ClientTimeout as e:
235
print(f"Timeout (attempt {attempt + 1}): {e}")
236
if attempt == max_attempts - 1:
237
raise
238
```
239
240
### Message Settlement Error Handling
241
242
Handle message settlement responses appropriately.
243
244
```python
245
from uamqp.errors import (
246
MessageRejected,
247
MessageReleased,
248
MessageModified,
249
MessageAlreadySettled
250
)
251
252
def process_message_safely(message):
253
try:
254
# Process message content
255
data = message.get_data()
256
result = process_business_logic(data)
257
258
# Accept message on successful processing
259
message.accept()
260
return result
261
262
except ValueError as e:
263
# Business logic error - reject message
264
print(f"Invalid message data: {e}")
265
message.reject(
266
condition="invalid-data",
267
description=str(e)
268
)
269
270
except MessageRejected as e:
271
print(f"Message was rejected: {e}")
272
# Log for analysis, don't reprocess
273
274
except MessageReleased as e:
275
print(f"Message was released: {e}")
276
# Message will be redelivered
277
278
except MessageAlreadySettled as e:
279
print(f"Message already settled: {e}")
280
# Continue processing, message already handled
281
282
except Exception as e:
283
# Unexpected error - release message for retry
284
print(f"Unexpected error: {e}")
285
try:
286
message.release()
287
except MessageAlreadySettled:
288
pass # Message already handled
289
```
290
291
### Client Error Handling with Policies
292
293
Use error policies for automatic retry behavior.
294
295
```python
296
from uamqp import SendClient
297
from uamqp.errors import ErrorPolicy, ErrorAction
298
299
def create_resilient_client(target, auth):
300
# Define error actions for different scenarios
301
retry_action = ErrorAction(retry=True, backoff=2.0)
302
no_retry_action = ErrorAction(retry=False)
303
304
# Create custom error policy
305
error_policy = ErrorPolicy(max_retries=3)
306
307
# Override specific error handling
308
def custom_connection_error_handler(error):
309
if "authentication" in str(error).lower():
310
return no_retry_action # Don't retry auth errors
311
return retry_action # Retry other connection errors
312
313
error_policy.on_connection_error = custom_connection_error_handler
314
315
return SendClient(target, auth=auth, error_policy=error_policy)
316
317
# Usage
318
try:
319
with create_resilient_client(target, auth) as client:
320
client.queue_message(message)
321
results = client.send_all_messages()
322
except AMQPError as e:
323
print(f"All retry attempts failed: {e}")
324
```
325
326
### Async Error Handling
327
328
Handle errors in async operations with proper coroutine error management.
329
330
```python
331
import asyncio
332
from uamqp.async_ops import SendClientAsync
333
from uamqp.errors import AMQPConnectionError, MessageSendFailed
334
335
async def robust_async_send(target, messages, auth):
336
max_retries = 3
337
338
for attempt in range(max_retries):
339
try:
340
async with SendClientAsync(target, auth=auth) as client:
341
results = await client.send_message_batch_async(messages)
342
print(f"Sent {len(results)} messages successfully")
343
return results
344
345
except AMQPConnectionError as e:
346
print(f"Connection error (attempt {attempt + 1}): {e}")
347
if attempt == max_retries - 1:
348
raise
349
await asyncio.sleep(2 ** attempt) # Exponential backoff
350
351
except MessageSendFailed as e:
352
print(f"Send failed (attempt {attempt + 1}): {e}")
353
if attempt == max_retries - 1:
354
raise
355
await asyncio.sleep(1)
356
357
# Usage in async context
358
async def main():
359
try:
360
results = await robust_async_send(target, messages, auth)
361
except AMQPError as e:
362
print(f"Failed after all retries: {e}")
363
364
asyncio.run(main())
365
```
366
367
### Context Manager Error Handling
368
369
Proper resource cleanup with error handling in context managers.
370
371
```python
372
from uamqp import ReceiveClient
373
from uamqp.errors import AMQPError
374
375
class RobustReceiveClient:
376
def __init__(self, source, auth, **kwargs):
377
self.source = source
378
self.auth = auth
379
self.client_kwargs = kwargs
380
self.client = None
381
382
def __enter__(self):
383
try:
384
self.client = ReceiveClient(self.source, auth=self.auth, **self.client_kwargs)
385
self.client.open()
386
return self.client
387
except AMQPError:
388
if self.client:
389
try:
390
self.client.close()
391
except:
392
pass # Ignore cleanup errors
393
raise
394
395
def __exit__(self, exc_type, exc_val, exc_tb):
396
if self.client:
397
try:
398
self.client.close()
399
except AMQPError as e:
400
print(f"Error during cleanup: {e}")
401
# Don't re-raise cleanup errors
402
403
# Handle different exception types
404
if exc_type == AMQPConnectionError:
405
print("Connection error occurred, consider retry")
406
return False # Re-raise the exception
407
elif exc_type == AuthenticationException:
408
print("Authentication failed, check credentials")
409
return False
410
411
return False # Don't suppress other exceptions
412
413
# Usage
414
try:
415
with RobustReceiveClient(source, auth) as client:
416
messages = client.receive_message_batch(timeout=30000)
417
for message in messages:
418
process_message_safely(message)
419
except AMQPError as e:
420
print(f"Client operation failed: {e}")
421
```
422
423
## Debugging and Diagnostics
424
425
### Enable Debug Logging
426
427
Use debug mode to get detailed protocol-level information for troubleshooting.
428
429
```python
430
import logging
431
432
# Enable uAMQP debug logging
433
logging.basicConfig(level=logging.INFO)
434
logger = logging.getLogger('uamqp')
435
logger.setLevel(logging.DEBUG)
436
437
# Use debug mode in clients
438
client = SendClient(target, auth=auth, debug=True)
439
```
440
441
### Error Information Extraction
442
443
Extract detailed error information for analysis and logging.
444
445
```python
446
def log_error_details(error):
447
print(f"Error type: {type(error).__name__}")
448
print(f"Error message: {str(error)}")
449
450
# Check for AMQP-specific error details
451
if hasattr(error, 'condition'):
452
print(f"AMQP condition: {error.condition}")
453
if hasattr(error, 'description'):
454
print(f"AMQP description: {error.description}")
455
if hasattr(error, 'info'):
456
print(f"AMQP info: {error.info}")
457
458
try:
459
# AMQP operation
460
pass
461
except AMQPError as e:
462
log_error_details(e)
463
# Decide on recovery action based on error details
464
```
465
466
### Health Monitoring
467
468
Implement health checks and monitoring for AMQP connections.
469
470
```python
471
import time
472
from uamqp.errors import AMQPError
473
474
class ConnectionHealthMonitor:
475
def __init__(self, target, auth):
476
self.target = target
477
self.auth = auth
478
self.last_error = None
479
self.error_count = 0
480
481
def check_health(self):
482
try:
483
# Simple health check using high-level API
484
from uamqp import send_message, Message
485
test_message = Message("health-check")
486
487
result = send_message(self.target, test_message, auth=self.auth)
488
489
# Reset error tracking on success
490
self.last_error = None
491
self.error_count = 0
492
return True
493
494
except AMQPError as e:
495
self.last_error = e
496
self.error_count += 1
497
return False
498
499
def get_health_status(self):
500
return {
501
'healthy': self.last_error is None,
502
'last_error': str(self.last_error) if self.last_error else None,
503
'error_count': self.error_count,
504
'last_check': time.time()
505
}
506
507
# Usage
508
monitor = ConnectionHealthMonitor(target, auth)
509
if not monitor.check_health():
510
status = monitor.get_health_status()
511
print(f"Connection unhealthy: {status}")
512
```