Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.
—
Comprehensive exception hierarchy for handling messaging errors, connection issues, and serialization problems. Kombu provides a structured set of exceptions that help identify and handle different types of failures in messaging applications.
Foundation exception classes that provide the base hierarchy for all Kombu-specific errors.
class KombuError(Exception):
"""
Common base class for all Kombu exceptions.
All Kombu-specific exceptions inherit from this class,
making it easy to catch any Kombu-related error.
"""
class OperationalError(KombuError):
"""
Recoverable message transport connection error.
Indicates a temporary error that may be resolved by retrying
the operation, such as network connectivity issues or
temporary broker unavailability.
"""Exceptions related to message serialization and deserialization failures.
class SerializationError(KombuError):
"""
Failed to serialize or deserialize message content.
Base class for all serialization-related errors.
"""
class EncodeError(SerializationError):
"""
Cannot encode object for serialization.
Raised when an object cannot be serialized using the
specified serialization method.
"""
class DecodeError(SerializationError):
"""
Cannot decode serialized data.
Raised when serialized data cannot be deserialized,
either due to corruption or incompatible format.
"""Exceptions related to AMQP entities, channels, and binding operations.
class NotBoundError(KombuError):
"""
Trying to call channel method on unbound entity.
Raised when attempting to perform operations on exchanges,
queues, or other entities that haven't been bound to a channel.
"""
class MessageStateError(KombuError):
"""
Message already acknowledged or in invalid state.
Raised when attempting to acknowledge, reject, or requeue
a message that has already been processed.
"""Exceptions related to resource limits and capacity constraints.
class LimitExceeded(KombuError):
"""
Generic limit exceeded error.
Base class for various limit-related exceptions.
"""
class ConnectionLimitExceeded(LimitExceeded):
"""
Maximum number of simultaneous connections exceeded.
Raised when attempting to create more connections than
allowed by broker or client configuration.
"""
class ChannelLimitExceeded(LimitExceeded):
"""
Maximum number of channels per connection exceeded.
Raised when attempting to create more channels than
allowed per connection.
"""Exceptions related to version mismatches and compatibility issues.
class VersionMismatch(KombuError):
"""
Library dependency version mismatch.
Raised when required library versions are incompatible
with current Kombu version or each other.
"""
class SerializerNotInstalled(SerializationError):
"""
Required serialization library not installed.
Raised when attempting to use a serializer (like msgpack
or yaml) that requires additional packages not installed.
"""Exceptions related to content filtering and security restrictions.
class ContentDisallowed(KombuError):
"""
Consumer doesn't accept this content type.
Raised when attempting to deliver a message with content
type not in the consumer's accept list.
"""
class InconsistencyError(KombuError):
"""
Data or environment inconsistency detected.
Raised when internal state inconsistencies are detected
that may indicate configuration or data corruption issues.
"""Exceptions related to HTTP transports and network operations.
class HttpError(KombuError):
"""
HTTP client error.
Raised by HTTP-based transports when HTTP operations fail.
"""Helper functions for exception handling and re-raising.
def reraise(tp, value, tb=None):
"""
Reraise exception with preserved traceback.
Parameters:
- tp (type): Exception type
- value (Exception): Exception instance
- tb (traceback): Traceback object (optional)
Raises:
The provided exception with preserved traceback information.
"""from kombu import Connection, Producer, Consumer, Queue
from kombu.exceptions import (
KombuError, OperationalError, SerializationError,
NotBoundError, MessageStateError
)
def robust_message_handling():
try:
with Connection('redis://localhost:6379/0') as conn:
queue = Queue('test_queue')
# This might raise NotBoundError if queue not bound
queue.declare(channel=conn.channel())
producer = Producer(conn.channel())
producer.publish({'message': 'hello'}, routing_key='test')
except OperationalError as e:
print(f"Connection/transport error: {e}")
# Could implement retry logic here
except SerializationError as e:
print(f"Serialization error: {e}")
# Handle serialization issues
except NotBoundError as e:
print(f"Entity not bound to channel: {e}")
# Fix binding issues
except KombuError as e:
print(f"Generic Kombu error: {e}")
# Handle any other Kombu-specific errors
except Exception as e:
print(f"Unexpected error: {e}")
# Handle non-Kombu errorsfrom kombu import Connection, Consumer, Queue
from kombu.exceptions import MessageStateError, DecodeError
def safe_message_processor(body, message):
"""Process message with comprehensive error handling"""
try:
# Process the message
result = process_business_logic(body)
# Acknowledge successful processing
try:
message.ack()
except MessageStateError:
print("Message already acknowledged")
except DecodeError as e:
print(f"Failed to decode message: {e}")
# Reject malformed messages without requeue
try:
message.reject(requeue=False)
except MessageStateError:
pass # Already processed
except ValueError as e:
print(f"Business logic error: {e}")
# Requeue for retry
try:
message.reject(requeue=True)
except MessageStateError:
pass # Already processed
except Exception as e:
print(f"Unexpected processing error: {e}")
# Decide whether to requeue or reject
try:
message.reject(requeue=False) # Don't requeue unknown errors
except MessageStateError:
pass
def process_business_logic(data):
"""Business logic that might fail"""
if not isinstance(data, dict):
raise ValueError("Expected dict data")
if 'required_field' not in data:
raise ValueError("Missing required field")
return {'processed': True, 'result': data['required_field'] * 2}
# Usage
with Connection('redis://localhost:6379/0') as conn:
queue = Queue('error_handling_queue')
consumer = Consumer(conn.channel(), [queue], callbacks=[safe_message_processor])
consumer.consume()
# Process messages with error handling
conn.drain_events(timeout=1.0)from kombu import Connection
from kombu.exceptions import OperationalError, ConnectionLimitExceeded
import time
import random
def robust_connection_handler(broker_url, max_retries=5):
"""Handle connection with retry logic"""
retry_count = 0
backoff_base = 1
while retry_count < max_retries:
try:
conn = Connection(broker_url)
conn.connect() # Explicit connection
print("Connection established successfully")
return conn
except ConnectionLimitExceeded as e:
print(f"Connection limit exceeded: {e}")
# This might not be retryable
time.sleep(backoff_base * (2 ** retry_count))
retry_count += 1
except OperationalError as e:
print(f"Operational error (attempt {retry_count + 1}): {e}")
# Exponential backoff with jitter
sleep_time = backoff_base * (2 ** retry_count) + random.uniform(0, 1)
print(f"Retrying in {sleep_time:.2f} seconds...")
time.sleep(sleep_time)
retry_count += 1
except Exception as e:
print(f"Unexpected connection error: {e}")
break
print(f"Failed to establish connection after {max_retries} attempts")
return None
# Usage
conn = robust_connection_handler('redis://localhost:6379/0')
if conn:
try:
# Use connection
with conn:
# Perform operations
pass
finally:
conn.close()from kombu.serialization import dumps, loads, enable_insecure_serializers
from kombu.exceptions import EncodeError, DecodeError, SerializerNotInstalled
def safe_serialization_test():
"""Test serialization with error handling"""
# Test data
serializable_data = {'message': 'hello', 'number': 42}
unserializable_data = {'function': lambda x: x} # Functions can't be serialized with JSON
# Test JSON serialization (safe)
try:
serialized, content_type, encoding = dumps(serializable_data, 'json')
deserialized = loads(serialized, content_type, encoding)
print(f"JSON serialization successful: {deserialized}")
except EncodeError as e:
print(f"JSON encode error: {e}")
except DecodeError as e:
print(f"JSON decode error: {e}")
# Test with unserializable data
try:
serialized, content_type, encoding = dumps(unserializable_data, 'json')
except EncodeError as e:
print(f"Expected JSON encode error: {e}")
# Test unavailable serializer
try:
enable_insecure_serializers(['nonexistent'])
serialized, content_type, encoding = dumps(serializable_data, 'nonexistent')
except SerializerNotInstalled as e:
print(f"Serializer not available: {e}")
except KeyError as e:
print(f"Unknown serializer: {e}")
# Test corrupted data
try:
corrupted_data = b'{"invalid": json'
deserialized = loads(corrupted_data, 'application/json')
except DecodeError as e:
print(f"Expected decode error: {e}")
safe_serialization_test()from kombu import Connection, Consumer, Queue
from kombu.exceptions import KombuError, OperationalError, SerializationError
import logging
import traceback
from datetime import datetime
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ErrorTrackingConsumer:
def __init__(self, connection, queues):
self.connection = connection
self.queues = queues
self.error_counts = {}
def process_message(self, body, message):
"""Process message with detailed error tracking"""
message_id = body.get('id', 'unknown')
try:
# Simulate processing
if body.get('should_fail'):
raise ValueError("Simulated processing failure")
logger.info(f"Successfully processed message {message_id}")
message.ack()
except Exception as exc:
self.handle_processing_error(exc, message, message_id)
def handle_processing_error(self, exc, message, message_id):
"""Handle and log processing errors"""
error_type = type(exc).__name__
# Track error counts
self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1
# Log error details
logger.error(f"Processing error for message {message_id}: {exc}")
logger.error(f"Error type: {error_type}")
logger.error(f"Total {error_type} errors: {self.error_counts[error_type]}")
# Log stack trace for debugging
logger.debug(traceback.format_exc())
# Handle different error types
if isinstance(exc, SerializationError):
logger.error("Serialization error - rejecting message")
message.reject(requeue=False)
elif isinstance(exc, ValueError):
logger.warning("Business logic error - requeuing for retry")
message.reject(requeue=True)
elif isinstance(exc, KombuError):
logger.error("Kombu-specific error - investigating")
message.reject(requeue=False)
else:
logger.error("Unknown error type - rejecting without requeue")
message.reject(requeue=False)
def run(self):
"""Main consumer loop with connection error handling"""
while True:
try:
consumer = Consumer(
self.connection.channel(),
self.queues,
callbacks=[self.process_message]
)
consumer.consume()
while True:
self.connection.drain_events(timeout=1.0)
except OperationalError as e:
logger.error(f"Connection error: {e}")
logger.info("Attempting to reconnect...")
time.sleep(5)
continue
except KeyboardInterrupt:
logger.info("Shutting down consumer...")
break
except Exception as e:
logger.error(f"Unexpected error: {e}")
logger.error(traceback.format_exc())
break
# Print final error summary
if self.error_counts:
logger.info("Error summary:")
for error_type, count in self.error_counts.items():
logger.info(f" {error_type}: {count} occurrences")
# Usage
if __name__ == '__main__':
with Connection('redis://localhost:6379/0') as conn:
queue = Queue('error_tracking_queue')
consumer = ErrorTrackingConsumer(conn, [queue])
consumer.run()from kombu.exceptions import KombuError
class CustomProcessingError(KombuError):
"""Custom error for application-specific failures"""
def __init__(self, message, error_code=None, retry_after=None):
super().__init__(message)
self.error_code = error_code
self.retry_after = retry_after
class DataValidationError(CustomProcessingError):
"""Error for data validation failures"""
pass
class ExternalServiceError(CustomProcessingError):
"""Error for external service failures"""
pass
def process_with_custom_errors(body, message):
"""Process message with custom error types"""
try:
# Validate data
if not body.get('user_id'):
raise DataValidationError(
"Missing user_id field",
error_code='MISSING_USER_ID'
)
# Call external service
if body.get('external_service_down'):
raise ExternalServiceError(
"External service unavailable",
error_code='SERVICE_DOWN',
retry_after=300 # Retry after 5 minutes
)
# Process successfully
message.ack()
except DataValidationError as e:
logger.error(f"Validation error: {e} (code: {e.error_code})")
message.reject(requeue=False) # Don't retry validation errors
except ExternalServiceError as e:
logger.warning(f"Service error: {e} (retry after: {e.retry_after}s)")
message.reject(requeue=True) # Retry service errors
except CustomProcessingError as e:
logger.error(f"Custom processing error: {e}")
message.reject(requeue=False)
except Exception as e:
logger.error(f"Unexpected error: {e}")
message.reject(requeue=False)from kombu import Connection
from kombu.exceptions import KombuError
import sys
def debug_kombu_exceptions():
"""Demonstrate exception information and debugging"""
try:
# Simulate various Kombu errors
conn = Connection('invalid://broker:9999')
conn.connect()
except KombuError as e:
print("Kombu Exception Details:")
print(f" Type: {type(e).__name__}")
print(f" Message: {str(e)}")
print(f" Module: {e.__class__.__module__}")
# Print exception hierarchy
print(" Exception hierarchy:")
for cls in type(e).__mro__:
if cls == object:
break
print(f" {cls.__name__}")
# Print traceback for debugging
import traceback
print(" Traceback:")
traceback.print_exc()
except Exception as e:
print(f"Non-Kombu exception: {type(e).__name__}: {e}")
debug_kombu_exceptions()Install with Tessl CLI
npx tessl i tessl/pypi-kombu