CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kombu

Messaging library for Python that provides an idiomatic high-level interface for AMQP and other message brokers.

Pending
Overview
Eval results
Files

exceptions.mddocs/

Exception Handling

Comprehensive exception hierarchy for handling messaging errors, connection issues, and serialization problems. Kombu provides a structured set of exceptions that help identify and handle different types of failures in messaging applications.

Capabilities

Base Exception Classes

Foundation exception classes that provide the base hierarchy for all Kombu-specific errors.

class KombuError(Exception):
    """
    Common base class for all Kombu exceptions.
    
    All Kombu-specific exceptions inherit from this class,
    making it easy to catch any Kombu-related error.
    """

class OperationalError(KombuError):
    """
    Recoverable message transport connection error.
    
    Indicates a temporary error that may be resolved by retrying
    the operation, such as network connectivity issues or
    temporary broker unavailability.
    """

Serialization Exceptions

Exceptions related to message serialization and deserialization failures.

class SerializationError(KombuError):
    """
    Failed to serialize or deserialize message content.
    
    Base class for all serialization-related errors.
    """

class EncodeError(SerializationError):
    """
    Cannot encode object for serialization.
    
    Raised when an object cannot be serialized using the
    specified serialization method.
    """

class DecodeError(SerializationError):
    """
    Cannot decode serialized data.
    
    Raised when serialized data cannot be deserialized,
    either due to corruption or incompatible format.
    """

Entity and Channel Exceptions

Exceptions related to AMQP entities, channels, and binding operations.

class NotBoundError(KombuError):
    """
    Trying to call channel method on unbound entity.
    
    Raised when attempting to perform operations on exchanges,
    queues, or other entities that haven't been bound to a channel.
    """

class MessageStateError(KombuError):
    """
    Message already acknowledged or in invalid state.
    
    Raised when attempting to acknowledge, reject, or requeue
    a message that has already been processed.
    """

Resource Limit Exceptions

Exceptions related to resource limits and capacity constraints.

class LimitExceeded(KombuError):
    """
    Generic limit exceeded error.
    
    Base class for various limit-related exceptions.
    """

class ConnectionLimitExceeded(LimitExceeded):
    """
    Maximum number of simultaneous connections exceeded.
    
    Raised when attempting to create more connections than
    allowed by broker or client configuration.
    """

class ChannelLimitExceeded(LimitExceeded):
    """
    Maximum number of channels per connection exceeded.
    
    Raised when attempting to create more channels than
    allowed per connection.
    """

Version and Compatibility Exceptions

Exceptions related to version mismatches and compatibility issues.

class VersionMismatch(KombuError):
    """
    Library dependency version mismatch.
    
    Raised when required library versions are incompatible
    with current Kombu version or each other.
    """

class SerializerNotInstalled(SerializationError):
    """
    Required serialization library not installed.
    
    Raised when attempting to use a serializer (like msgpack
    or yaml) that requires additional packages not installed.
    """

Content and Security Exceptions

Exceptions related to content filtering and security restrictions.

class ContentDisallowed(KombuError):
    """
    Consumer doesn't accept this content type.
    
    Raised when attempting to deliver a message with content
    type not in the consumer's accept list.
    """

class InconsistencyError(KombuError):
    """
    Data or environment inconsistency detected.
    
    Raised when internal state inconsistencies are detected
    that may indicate configuration or data corruption issues.
    """

HTTP and Network Exceptions

Exceptions related to HTTP transports and network operations.

class HttpError(KombuError):
    """
    HTTP client error.
    
    Raised by HTTP-based transports when HTTP operations fail.
    """

Utility Functions

Helper functions for exception handling and re-raising.

def reraise(tp, value, tb=None):
    """
    Reraise exception with preserved traceback.
    
    Parameters:
    - tp (type): Exception type
    - value (Exception): Exception instance
    - tb (traceback): Traceback object (optional)
    
    Raises:
    The provided exception with preserved traceback information.
    """

Usage Examples

Basic Exception Handling

from kombu import Connection, Producer, Consumer, Queue
from kombu.exceptions import (
    KombuError, OperationalError, SerializationError,
    NotBoundError, MessageStateError
)

def robust_message_handling():
    try:
        with Connection('redis://localhost:6379/0') as conn:
            queue = Queue('test_queue')
            
            # This might raise NotBoundError if queue not bound
            queue.declare(channel=conn.channel())
            
            producer = Producer(conn.channel())
            producer.publish({'message': 'hello'}, routing_key='test')
            
    except OperationalError as e:
        print(f"Connection/transport error: {e}")
        # Could implement retry logic here
        
    except SerializationError as e:
        print(f"Serialization error: {e}")
        # Handle serialization issues
        
    except NotBoundError as e:
        print(f"Entity not bound to channel: {e}")
        # Fix binding issues
        
    except KombuError as e:
        print(f"Generic Kombu error: {e}")
        # Handle any other Kombu-specific errors
        
    except Exception as e:
        print(f"Unexpected error: {e}")
        # Handle non-Kombu errors

Message Processing Error Handling

from kombu import Connection, Consumer, Queue
from kombu.exceptions import MessageStateError, DecodeError

def safe_message_processor(body, message):
    """Process message with comprehensive error handling"""
    try:
        # Process the message
        result = process_business_logic(body)
        
        # Acknowledge successful processing
        try:
            message.ack()
        except MessageStateError:
            print("Message already acknowledged")
            
    except DecodeError as e:
        print(f"Failed to decode message: {e}")
        # Reject malformed messages without requeue
        try:
            message.reject(requeue=False)
        except MessageStateError:
            pass  # Already processed
            
    except ValueError as e:
        print(f"Business logic error: {e}")
        # Requeue for retry
        try:
            message.reject(requeue=True)
        except MessageStateError:
            pass  # Already processed
            
    except Exception as e:
        print(f"Unexpected processing error: {e}")
        # Decide whether to requeue or reject
        try:
            message.reject(requeue=False)  # Don't requeue unknown errors
        except MessageStateError:
            pass

def process_business_logic(data):
    """Business logic that might fail"""
    if not isinstance(data, dict):
        raise ValueError("Expected dict data")
    
    if 'required_field' not in data:
        raise ValueError("Missing required field")
    
    return {'processed': True, 'result': data['required_field'] * 2}

# Usage
with Connection('redis://localhost:6379/0') as conn:
    queue = Queue('error_handling_queue')
    consumer = Consumer(conn.channel(), [queue], callbacks=[safe_message_processor])
    consumer.consume()
    
    # Process messages with error handling
    conn.drain_events(timeout=1.0)

Connection and Transport Error Handling

from kombu import Connection
from kombu.exceptions import OperationalError, ConnectionLimitExceeded
import time
import random

def robust_connection_handler(broker_url, max_retries=5):
    """Handle connection with retry logic"""
    retry_count = 0
    backoff_base = 1
    
    while retry_count < max_retries:
        try:
            conn = Connection(broker_url)
            conn.connect()  # Explicit connection
            
            print("Connection established successfully")
            return conn
            
        except ConnectionLimitExceeded as e:
            print(f"Connection limit exceeded: {e}")
            # This might not be retryable
            time.sleep(backoff_base * (2 ** retry_count))
            retry_count += 1
            
        except OperationalError as e:
            print(f"Operational error (attempt {retry_count + 1}): {e}")
            
            # Exponential backoff with jitter
            sleep_time = backoff_base * (2 ** retry_count) + random.uniform(0, 1)
            print(f"Retrying in {sleep_time:.2f} seconds...")
            time.sleep(sleep_time)
            retry_count += 1
            
        except Exception as e:
            print(f"Unexpected connection error: {e}")
            break
    
    print(f"Failed to establish connection after {max_retries} attempts")
    return None

# Usage
conn = robust_connection_handler('redis://localhost:6379/0')
if conn:
    try:
        # Use connection
        with conn:
            # Perform operations
            pass
    finally:
        conn.close()

Serialization Error Handling

from kombu.serialization import dumps, loads, enable_insecure_serializers
from kombu.exceptions import EncodeError, DecodeError, SerializerNotInstalled

def safe_serialization_test():
    """Test serialization with error handling"""
    
    # Test data
    serializable_data = {'message': 'hello', 'number': 42}
    unserializable_data = {'function': lambda x: x}  # Functions can't be serialized with JSON
    
    # Test JSON serialization (safe)
    try:
        serialized, content_type, encoding = dumps(serializable_data, 'json')
        deserialized = loads(serialized, content_type, encoding)
        print(f"JSON serialization successful: {deserialized}")
    except EncodeError as e:
        print(f"JSON encode error: {e}")
    except DecodeError as e:
        print(f"JSON decode error: {e}")
    
    # Test with unserializable data
    try:
        serialized, content_type, encoding = dumps(unserializable_data, 'json')
    except EncodeError as e:
        print(f"Expected JSON encode error: {e}")
    
    # Test unavailable serializer
    try:
        enable_insecure_serializers(['nonexistent'])
        serialized, content_type, encoding = dumps(serializable_data, 'nonexistent')
    except SerializerNotInstalled as e:
        print(f"Serializer not available: {e}")
    except KeyError as e:
        print(f"Unknown serializer: {e}")
    
    # Test corrupted data
    try:
        corrupted_data = b'{"invalid": json'
        deserialized = loads(corrupted_data, 'application/json')
    except DecodeError as e:
        print(f"Expected decode error: {e}")

safe_serialization_test()

Exception Logging and Monitoring

from kombu import Connection, Consumer, Queue
from kombu.exceptions import KombuError, OperationalError, SerializationError
import logging
import traceback
from datetime import datetime

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ErrorTrackingConsumer:
    def __init__(self, connection, queues):
        self.connection = connection
        self.queues = queues
        self.error_counts = {}
    
    def process_message(self, body, message):
        """Process message with detailed error tracking"""
        message_id = body.get('id', 'unknown')
        
        try:
            # Simulate processing
            if body.get('should_fail'):
                raise ValueError("Simulated processing failure")
            
            logger.info(f"Successfully processed message {message_id}")
            message.ack()
            
        except Exception as exc:
            self.handle_processing_error(exc, message, message_id)
    
    def handle_processing_error(self, exc, message, message_id):
        """Handle and log processing errors"""
        error_type = type(exc).__name__
        
        # Track error counts
        self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1
        
        # Log error details
        logger.error(f"Processing error for message {message_id}: {exc}")
        logger.error(f"Error type: {error_type}")
        logger.error(f"Total {error_type} errors: {self.error_counts[error_type]}")
        
        # Log stack trace for debugging
        logger.debug(traceback.format_exc())
        
        # Handle different error types
        if isinstance(exc, SerializationError):
            logger.error("Serialization error - rejecting message")
            message.reject(requeue=False)
            
        elif isinstance(exc, ValueError):
            logger.warning("Business logic error - requeuing for retry")
            message.reject(requeue=True)
            
        elif isinstance(exc, KombuError):
            logger.error("Kombu-specific error - investigating")
            message.reject(requeue=False)
            
        else:
            logger.error("Unknown error type - rejecting without requeue")
            message.reject(requeue=False)
    
    def run(self):
        """Main consumer loop with connection error handling"""
        while True:
            try:
                consumer = Consumer(
                    self.connection.channel(),
                    self.queues,
                    callbacks=[self.process_message]
                )
                
                consumer.consume()
                
                while True:
                    self.connection.drain_events(timeout=1.0)
                    
            except OperationalError as e:
                logger.error(f"Connection error: {e}")
                logger.info("Attempting to reconnect...")
                time.sleep(5)
                continue
                
            except KeyboardInterrupt:
                logger.info("Shutting down consumer...")
                break
                
            except Exception as e:
                logger.error(f"Unexpected error: {e}")
                logger.error(traceback.format_exc())
                break
        
        # Print final error summary
        if self.error_counts:
            logger.info("Error summary:")
            for error_type, count in self.error_counts.items():
                logger.info(f"  {error_type}: {count} occurrences")

# Usage
if __name__ == '__main__':
    with Connection('redis://localhost:6379/0') as conn:
        queue = Queue('error_tracking_queue')
        consumer = ErrorTrackingConsumer(conn, [queue])
        consumer.run()

Custom Exception Handling

from kombu.exceptions import KombuError

class CustomProcessingError(KombuError):
    """Custom error for application-specific failures"""
    def __init__(self, message, error_code=None, retry_after=None):
        super().__init__(message)
        self.error_code = error_code
        self.retry_after = retry_after

class DataValidationError(CustomProcessingError):
    """Error for data validation failures"""
    pass

class ExternalServiceError(CustomProcessingError):
    """Error for external service failures"""
    pass

def process_with_custom_errors(body, message):
    """Process message with custom error types"""
    try:
        # Validate data
        if not body.get('user_id'):
            raise DataValidationError(
                "Missing user_id field",
                error_code='MISSING_USER_ID'
            )
        
        # Call external service
        if body.get('external_service_down'):
            raise ExternalServiceError(
                "External service unavailable",
                error_code='SERVICE_DOWN',
                retry_after=300  # Retry after 5 minutes
            )
        
        # Process successfully
        message.ack()
        
    except DataValidationError as e:
        logger.error(f"Validation error: {e} (code: {e.error_code})")
        message.reject(requeue=False)  # Don't retry validation errors
        
    except ExternalServiceError as e:
        logger.warning(f"Service error: {e} (retry after: {e.retry_after}s)")
        message.reject(requeue=True)  # Retry service errors
        
    except CustomProcessingError as e:
        logger.error(f"Custom processing error: {e}")
        message.reject(requeue=False)
        
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        message.reject(requeue=False)

Exception Context and Debugging

from kombu import Connection
from kombu.exceptions import KombuError
import sys

def debug_kombu_exceptions():
    """Demonstrate exception information and debugging"""
    
    try:
        # Simulate various Kombu errors
        conn = Connection('invalid://broker:9999')
        conn.connect()
        
    except KombuError as e:
        print("Kombu Exception Details:")
        print(f"  Type: {type(e).__name__}")
        print(f"  Message: {str(e)}")
        print(f"  Module: {e.__class__.__module__}")
        
        # Print exception hierarchy
        print("  Exception hierarchy:")
        for cls in type(e).__mro__:
            if cls == object:
                break
            print(f"    {cls.__name__}")
        
        # Print traceback for debugging
        import traceback
        print("  Traceback:")
        traceback.print_exc()
        
    except Exception as e:
        print(f"Non-Kombu exception: {type(e).__name__}: {e}")

debug_kombu_exceptions()

Install with Tessl CLI

npx tessl i tessl/pypi-kombu

docs

compression.md

connection.md

entities.md

exceptions.md

index.md

messaging.md

mixins.md

pools.md

serialization.md

simple.md

tile.json