0
# Exception Handling
1
2
Comprehensive exception hierarchy for handling messaging errors, connection issues, and serialization problems. Kombu provides a structured set of exceptions that help identify and handle different types of failures in messaging applications.
3
4
## Capabilities
5
6
### Base Exception Classes
7
8
Foundation exception classes that provide the base hierarchy for all Kombu-specific errors.
9
10
```python { .api }
11
class KombuError(Exception):
12
"""
13
Common base class for all Kombu exceptions.
14
15
All Kombu-specific exceptions inherit from this class,
16
making it easy to catch any Kombu-related error.
17
"""
18
19
class OperationalError(KombuError):
20
"""
21
Recoverable message transport connection error.
22
23
Indicates a temporary error that may be resolved by retrying
24
the operation, such as network connectivity issues or
25
temporary broker unavailability.
26
"""
27
```
28
29
### Serialization Exceptions
30
31
Exceptions related to message serialization and deserialization failures.
32
33
```python { .api }
34
class SerializationError(KombuError):
35
"""
36
Failed to serialize or deserialize message content.
37
38
Base class for all serialization-related errors.
39
"""
40
41
class EncodeError(SerializationError):
42
"""
43
Cannot encode object for serialization.
44
45
Raised when an object cannot be serialized using the
46
specified serialization method.
47
"""
48
49
class DecodeError(SerializationError):
50
"""
51
Cannot decode serialized data.
52
53
Raised when serialized data cannot be deserialized,
54
either due to corruption or incompatible format.
55
"""
56
```
57
58
### Entity and Channel Exceptions
59
60
Exceptions related to AMQP entities, channels, and binding operations.
61
62
```python { .api }
63
class NotBoundError(KombuError):
64
"""
65
Trying to call channel method on unbound entity.
66
67
Raised when attempting to perform operations on exchanges,
68
queues, or other entities that haven't been bound to a channel.
69
"""
70
71
class MessageStateError(KombuError):
72
"""
73
Message already acknowledged or in invalid state.
74
75
Raised when attempting to acknowledge, reject, or requeue
76
a message that has already been processed.
77
"""
78
```
79
80
### Resource Limit Exceptions
81
82
Exceptions related to resource limits and capacity constraints.
83
84
```python { .api }
85
class LimitExceeded(KombuError):
86
"""
87
Generic limit exceeded error.
88
89
Base class for various limit-related exceptions.
90
"""
91
92
class ConnectionLimitExceeded(LimitExceeded):
93
"""
94
Maximum number of simultaneous connections exceeded.
95
96
Raised when attempting to create more connections than
97
allowed by broker or client configuration.
98
"""
99
100
class ChannelLimitExceeded(LimitExceeded):
101
"""
102
Maximum number of channels per connection exceeded.
103
104
Raised when attempting to create more channels than
105
allowed per connection.
106
"""
107
```
108
109
### Version and Compatibility Exceptions
110
111
Exceptions related to version mismatches and compatibility issues.
112
113
```python { .api }
114
class VersionMismatch(KombuError):
115
"""
116
Library dependency version mismatch.
117
118
Raised when required library versions are incompatible
119
with current Kombu version or each other.
120
"""
121
122
class SerializerNotInstalled(SerializationError):
123
"""
124
Required serialization library not installed.
125
126
Raised when attempting to use a serializer (like msgpack
127
or yaml) that requires additional packages not installed.
128
"""
129
```
130
131
### Content and Security Exceptions
132
133
Exceptions related to content filtering and security restrictions.
134
135
```python { .api }
136
class ContentDisallowed(KombuError):
137
"""
138
Consumer doesn't accept this content type.
139
140
Raised when attempting to deliver a message with content
141
type not in the consumer's accept list.
142
"""
143
144
class InconsistencyError(KombuError):
145
"""
146
Data or environment inconsistency detected.
147
148
Raised when internal state inconsistencies are detected
149
that may indicate configuration or data corruption issues.
150
"""
151
```
152
153
### HTTP and Network Exceptions
154
155
Exceptions related to HTTP transports and network operations.
156
157
```python { .api }
158
class HttpError(KombuError):
159
"""
160
HTTP client error.
161
162
Raised by HTTP-based transports when HTTP operations fail.
163
"""
164
```
165
166
### Utility Functions
167
168
Helper functions for exception handling and re-raising.
169
170
```python { .api }
171
def reraise(tp, value, tb=None):
172
"""
173
Reraise exception with preserved traceback.
174
175
Parameters:
176
- tp (type): Exception type
177
- value (Exception): Exception instance
178
- tb (traceback): Traceback object (optional)
179
180
Raises:
181
The provided exception with preserved traceback information.
182
"""
183
```
184
185
## Usage Examples
186
187
### Basic Exception Handling
188
189
```python
190
from kombu import Connection, Producer, Consumer, Queue
191
from kombu.exceptions import (
192
KombuError, OperationalError, SerializationError,
193
NotBoundError, MessageStateError
194
)
195
196
def robust_message_handling():
197
try:
198
with Connection('redis://localhost:6379/0') as conn:
199
queue = Queue('test_queue')
200
201
# This might raise NotBoundError if queue not bound
202
queue.declare(channel=conn.channel())
203
204
producer = Producer(conn.channel())
205
producer.publish({'message': 'hello'}, routing_key='test')
206
207
except OperationalError as e:
208
print(f"Connection/transport error: {e}")
209
# Could implement retry logic here
210
211
except SerializationError as e:
212
print(f"Serialization error: {e}")
213
# Handle serialization issues
214
215
except NotBoundError as e:
216
print(f"Entity not bound to channel: {e}")
217
# Fix binding issues
218
219
except KombuError as e:
220
print(f"Generic Kombu error: {e}")
221
# Handle any other Kombu-specific errors
222
223
except Exception as e:
224
print(f"Unexpected error: {e}")
225
# Handle non-Kombu errors
226
```
227
228
### Message Processing Error Handling
229
230
```python
231
from kombu import Connection, Consumer, Queue
232
from kombu.exceptions import MessageStateError, DecodeError
233
234
def safe_message_processor(body, message):
235
"""Process message with comprehensive error handling"""
236
try:
237
# Process the message
238
result = process_business_logic(body)
239
240
# Acknowledge successful processing
241
try:
242
message.ack()
243
except MessageStateError:
244
print("Message already acknowledged")
245
246
except DecodeError as e:
247
print(f"Failed to decode message: {e}")
248
# Reject malformed messages without requeue
249
try:
250
message.reject(requeue=False)
251
except MessageStateError:
252
pass # Already processed
253
254
except ValueError as e:
255
print(f"Business logic error: {e}")
256
# Requeue for retry
257
try:
258
message.reject(requeue=True)
259
except MessageStateError:
260
pass # Already processed
261
262
except Exception as e:
263
print(f"Unexpected processing error: {e}")
264
# Decide whether to requeue or reject
265
try:
266
message.reject(requeue=False) # Don't requeue unknown errors
267
except MessageStateError:
268
pass
269
270
def process_business_logic(data):
271
"""Business logic that might fail"""
272
if not isinstance(data, dict):
273
raise ValueError("Expected dict data")
274
275
if 'required_field' not in data:
276
raise ValueError("Missing required field")
277
278
return {'processed': True, 'result': data['required_field'] * 2}
279
280
# Usage
281
with Connection('redis://localhost:6379/0') as conn:
282
queue = Queue('error_handling_queue')
283
consumer = Consumer(conn.channel(), [queue], callbacks=[safe_message_processor])
284
consumer.consume()
285
286
# Process messages with error handling
287
conn.drain_events(timeout=1.0)
288
```
289
290
### Connection and Transport Error Handling
291
292
```python
293
from kombu import Connection
294
from kombu.exceptions import OperationalError, ConnectionLimitExceeded
295
import time
296
import random
297
298
def robust_connection_handler(broker_url, max_retries=5):
299
"""Handle connection with retry logic"""
300
retry_count = 0
301
backoff_base = 1
302
303
while retry_count < max_retries:
304
try:
305
conn = Connection(broker_url)
306
conn.connect() # Explicit connection
307
308
print("Connection established successfully")
309
return conn
310
311
except ConnectionLimitExceeded as e:
312
print(f"Connection limit exceeded: {e}")
313
# This might not be retryable
314
time.sleep(backoff_base * (2 ** retry_count))
315
retry_count += 1
316
317
except OperationalError as e:
318
print(f"Operational error (attempt {retry_count + 1}): {e}")
319
320
# Exponential backoff with jitter
321
sleep_time = backoff_base * (2 ** retry_count) + random.uniform(0, 1)
322
print(f"Retrying in {sleep_time:.2f} seconds...")
323
time.sleep(sleep_time)
324
retry_count += 1
325
326
except Exception as e:
327
print(f"Unexpected connection error: {e}")
328
break
329
330
print(f"Failed to establish connection after {max_retries} attempts")
331
return None
332
333
# Usage
334
conn = robust_connection_handler('redis://localhost:6379/0')
335
if conn:
336
try:
337
# Use connection
338
with conn:
339
# Perform operations
340
pass
341
finally:
342
conn.close()
343
```
344
345
### Serialization Error Handling
346
347
```python
348
from kombu.serialization import dumps, loads, enable_insecure_serializers
349
from kombu.exceptions import EncodeError, DecodeError, SerializerNotInstalled
350
351
def safe_serialization_test():
352
"""Test serialization with error handling"""
353
354
# Test data
355
serializable_data = {'message': 'hello', 'number': 42}
356
unserializable_data = {'function': lambda x: x} # Functions can't be serialized with JSON
357
358
# Test JSON serialization (safe)
359
try:
360
serialized, content_type, encoding = dumps(serializable_data, 'json')
361
deserialized = loads(serialized, content_type, encoding)
362
print(f"JSON serialization successful: {deserialized}")
363
except EncodeError as e:
364
print(f"JSON encode error: {e}")
365
except DecodeError as e:
366
print(f"JSON decode error: {e}")
367
368
# Test with unserializable data
369
try:
370
serialized, content_type, encoding = dumps(unserializable_data, 'json')
371
except EncodeError as e:
372
print(f"Expected JSON encode error: {e}")
373
374
# Test unavailable serializer
375
try:
376
enable_insecure_serializers(['nonexistent'])
377
serialized, content_type, encoding = dumps(serializable_data, 'nonexistent')
378
except SerializerNotInstalled as e:
379
print(f"Serializer not available: {e}")
380
except KeyError as e:
381
print(f"Unknown serializer: {e}")
382
383
# Test corrupted data
384
try:
385
corrupted_data = b'{"invalid": json'
386
deserialized = loads(corrupted_data, 'application/json')
387
except DecodeError as e:
388
print(f"Expected decode error: {e}")
389
390
safe_serialization_test()
391
```
392
393
### Exception Logging and Monitoring
394
395
```python
396
from kombu import Connection, Consumer, Queue
397
from kombu.exceptions import KombuError, OperationalError, SerializationError
398
import logging
399
import traceback
400
from datetime import datetime
401
402
# Setup logging
403
logging.basicConfig(level=logging.INFO)
404
logger = logging.getLogger(__name__)
405
406
class ErrorTrackingConsumer:
407
def __init__(self, connection, queues):
408
self.connection = connection
409
self.queues = queues
410
self.error_counts = {}
411
412
def process_message(self, body, message):
413
"""Process message with detailed error tracking"""
414
message_id = body.get('id', 'unknown')
415
416
try:
417
# Simulate processing
418
if body.get('should_fail'):
419
raise ValueError("Simulated processing failure")
420
421
logger.info(f"Successfully processed message {message_id}")
422
message.ack()
423
424
except Exception as exc:
425
self.handle_processing_error(exc, message, message_id)
426
427
def handle_processing_error(self, exc, message, message_id):
428
"""Handle and log processing errors"""
429
error_type = type(exc).__name__
430
431
# Track error counts
432
self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1
433
434
# Log error details
435
logger.error(f"Processing error for message {message_id}: {exc}")
436
logger.error(f"Error type: {error_type}")
437
logger.error(f"Total {error_type} errors: {self.error_counts[error_type]}")
438
439
# Log stack trace for debugging
440
logger.debug(traceback.format_exc())
441
442
# Handle different error types
443
if isinstance(exc, SerializationError):
444
logger.error("Serialization error - rejecting message")
445
message.reject(requeue=False)
446
447
elif isinstance(exc, ValueError):
448
logger.warning("Business logic error - requeuing for retry")
449
message.reject(requeue=True)
450
451
elif isinstance(exc, KombuError):
452
logger.error("Kombu-specific error - investigating")
453
message.reject(requeue=False)
454
455
else:
456
logger.error("Unknown error type - rejecting without requeue")
457
message.reject(requeue=False)
458
459
def run(self):
460
"""Main consumer loop with connection error handling"""
461
while True:
462
try:
463
consumer = Consumer(
464
self.connection.channel(),
465
self.queues,
466
callbacks=[self.process_message]
467
)
468
469
consumer.consume()
470
471
while True:
472
self.connection.drain_events(timeout=1.0)
473
474
except OperationalError as e:
475
logger.error(f"Connection error: {e}")
476
logger.info("Attempting to reconnect...")
477
time.sleep(5)
478
continue
479
480
except KeyboardInterrupt:
481
logger.info("Shutting down consumer...")
482
break
483
484
except Exception as e:
485
logger.error(f"Unexpected error: {e}")
486
logger.error(traceback.format_exc())
487
break
488
489
# Print final error summary
490
if self.error_counts:
491
logger.info("Error summary:")
492
for error_type, count in self.error_counts.items():
493
logger.info(f" {error_type}: {count} occurrences")
494
495
# Usage
496
if __name__ == '__main__':
497
with Connection('redis://localhost:6379/0') as conn:
498
queue = Queue('error_tracking_queue')
499
consumer = ErrorTrackingConsumer(conn, [queue])
500
consumer.run()
501
```
502
503
### Custom Exception Handling
504
505
```python
506
from kombu.exceptions import KombuError
507
508
class CustomProcessingError(KombuError):
509
"""Custom error for application-specific failures"""
510
def __init__(self, message, error_code=None, retry_after=None):
511
super().__init__(message)
512
self.error_code = error_code
513
self.retry_after = retry_after
514
515
class DataValidationError(CustomProcessingError):
516
"""Error for data validation failures"""
517
pass
518
519
class ExternalServiceError(CustomProcessingError):
520
"""Error for external service failures"""
521
pass
522
523
def process_with_custom_errors(body, message):
524
"""Process message with custom error types"""
525
try:
526
# Validate data
527
if not body.get('user_id'):
528
raise DataValidationError(
529
"Missing user_id field",
530
error_code='MISSING_USER_ID'
531
)
532
533
# Call external service
534
if body.get('external_service_down'):
535
raise ExternalServiceError(
536
"External service unavailable",
537
error_code='SERVICE_DOWN',
538
retry_after=300 # Retry after 5 minutes
539
)
540
541
# Process successfully
542
message.ack()
543
544
except DataValidationError as e:
545
logger.error(f"Validation error: {e} (code: {e.error_code})")
546
message.reject(requeue=False) # Don't retry validation errors
547
548
except ExternalServiceError as e:
549
logger.warning(f"Service error: {e} (retry after: {e.retry_after}s)")
550
message.reject(requeue=True) # Retry service errors
551
552
except CustomProcessingError as e:
553
logger.error(f"Custom processing error: {e}")
554
message.reject(requeue=False)
555
556
except Exception as e:
557
logger.error(f"Unexpected error: {e}")
558
message.reject(requeue=False)
559
```
560
561
### Exception Context and Debugging
562
563
```python
564
from kombu import Connection
565
from kombu.exceptions import KombuError
566
import sys
567
568
def debug_kombu_exceptions():
569
"""Demonstrate exception information and debugging"""
570
571
try:
572
# Simulate various Kombu errors
573
conn = Connection('invalid://broker:9999')
574
conn.connect()
575
576
except KombuError as e:
577
print("Kombu Exception Details:")
578
print(f" Type: {type(e).__name__}")
579
print(f" Message: {str(e)}")
580
print(f" Module: {e.__class__.__module__}")
581
582
# Print exception hierarchy
583
print(" Exception hierarchy:")
584
for cls in type(e).__mro__:
585
if cls == object:
586
break
587
print(f" {cls.__name__}")
588
589
# Print traceback for debugging
590
import traceback
591
print(" Traceback:")
592
traceback.print_exc()
593
594
except Exception as e:
595
print(f"Non-Kombu exception: {type(e).__name__}: {e}")
596
597
debug_kombu_exceptions()
598
```