CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-uamqp

An AMQP 1.0 client library for Python

Overview
Eval results
Files

error-handling.mddocs/

Error Handling

Comprehensive error handling with custom exceptions, error policies, and retry mechanisms for robust messaging applications that can gracefully handle network issues, authentication failures, and protocol errors.

Capabilities

Error Policy Configuration

Configurable error handling policies that define how clients should respond to different types of errors.

class ErrorPolicy:
    def __init__(self, max_retries=3, on_error=None):
        """
        Error handling policy configuration.

        Parameters:
        - max_retries (int): Maximum number of retry attempts
        - on_error (ErrorAction): Default error action to take
        """

    def on_unrecognized_error(self, error):
        """Handle unrecognized errors."""

    def on_message_error(self, error):
        """Handle message-level errors."""

    def on_link_error(self, error):
        """Handle link-level errors."""

    def on_connection_error(self, error):
        """Handle connection-level errors."""

Error Actions

Configuration for specific error handling actions including retry behavior and backoff strategies.

class ErrorAction:
    def __init__(self, retry=True, backoff=1.0, increment_retries=True):
        """
        Error action specification.

        Parameters:
        - retry (bool): Whether to retry the operation
        - backoff (float): Backoff delay in seconds
        - increment_retries (bool): Whether to increment retry counter
        """

    @property
    def retry: bool
        """Whether to retry the failed operation."""

    @property
    def backoff: float
        """Backoff delay before retry in seconds."""

    @property
    def increment_retries: bool
        """Whether to count this as a retry attempt."""

Usage Example:

from uamqp.errors import ErrorPolicy, ErrorAction

# Custom error policy with exponential backoff
def create_retry_action(attempt):
    backoff_delay = min(2.0 ** attempt, 60.0)  # Cap at 60 seconds
    return ErrorAction(retry=True, backoff=backoff_delay)

error_policy = ErrorPolicy(
    max_retries=5,
    on_error=ErrorAction(retry=True, backoff=1.0)
)

# Use with client
from uamqp import SendClient
client = SendClient(target, auth=auth, error_policy=error_policy)

Exception Hierarchy

Base AMQP Errors

Root exception classes for AMQP-related errors.

class AMQPError(Exception):
    """Base exception for all AMQP-related errors."""

class AMQPConnectionError(AMQPError):
    """Connection-related errors including network and protocol issues."""

class AMQPClientShutdown(AMQPError):
    """Exception raised when client is shutting down."""

class MessageHandlerError(AMQPError):
    """Errors in message handling and processing."""

Connection Errors

Errors related to AMQP connection management and lifecycle.

class ConnectionClose(AMQPConnectionError):
    """Connection was closed by the broker."""

class VendorConnectionClose(ConnectionClose):
    """Vendor-specific connection closure."""

Link Errors

Errors related to AMQP links (message senders and receivers).

class LinkDetach(AMQPError):
    """Link was detached by remote peer."""

class VendorLinkDetach(LinkDetach):
    """Vendor-specific link detachment."""

class LinkRedirect(AMQPError):
    """Link redirected to different endpoint."""

Authentication Errors

Errors related to authentication and authorization.

class AuthenticationException(AMQPError):
    """General authentication failure."""

class TokenExpired(AuthenticationException):
    """Authentication token has expired."""

class TokenAuthFailure(AuthenticationException):
    """Token authentication specifically failed."""

Message Errors

Errors related to message processing and state management.

class MessageResponse(AMQPError):
    """Base class for message response errors."""

class MessageException(MessageResponse):
    """General message processing error."""

class MessageSendFailed(MessageException):
    """Message sending operation failed."""

class MessageContentTooLarge(MessageException):
    """Message exceeds maximum size limit."""

Message Settlement Responses

Specific responses to message settlement that may be treated as exceptions.

class MessageAlreadySettled(MessageResponse):
    """Attempt to settle already-settled message."""

class MessageAccepted(MessageResponse):
    """Message was accepted by receiver."""

class MessageRejected(MessageResponse):
    """Message was rejected by receiver."""

class MessageReleased(MessageResponse):
    """Message was released by receiver."""

class MessageModified(MessageResponse):
    """Message was modified by receiver."""

Timeout Errors

Errors related to operation timeouts.

class ClientTimeout(AMQPError):
    """Client operation timed out."""

Error Handling Patterns

Basic Exception Handling

Handle common AMQP errors with appropriate recovery strategies.

from uamqp import send_message, receive_message
from uamqp.errors import (
    AMQPConnectionError,
    AuthenticationException, 
    MessageSendFailed,
    ClientTimeout
)

def robust_send_message(target, message, auth):
    max_attempts = 3
    
    for attempt in range(max_attempts):
        try:
            result = send_message(target, message, auth=auth)
            print(f"Message sent successfully: {result}")
            return result
            
        except AMQPConnectionError as e:
            print(f"Connection error (attempt {attempt + 1}): {e}")
            if attempt == max_attempts - 1:
                raise
            time.sleep(2 ** attempt)  # Exponential backoff
            
        except AuthenticationException as e:
            print(f"Authentication failed: {e}")
            # Don't retry auth failures
            raise
            
        except MessageSendFailed as e:
            print(f"Send failed (attempt {attempt + 1}): {e}")
            if attempt == max_attempts - 1:
                raise
            time.sleep(1)
            
        except ClientTimeout as e:
            print(f"Timeout (attempt {attempt + 1}): {e}")
            if attempt == max_attempts - 1:
                raise

Message Settlement Error Handling

Handle message settlement responses appropriately.

from uamqp.errors import (
    MessageRejected,
    MessageReleased, 
    MessageModified,
    MessageAlreadySettled
)

def process_message_safely(message):
    try:
        # Process message content
        data = message.get_data()
        result = process_business_logic(data)
        
        # Accept message on successful processing
        message.accept()
        return result
        
    except ValueError as e:
        # Business logic error - reject message
        print(f"Invalid message data: {e}")
        message.reject(
            condition="invalid-data",
            description=str(e)
        )
        
    except MessageRejected as e:
        print(f"Message was rejected: {e}")
        # Log for analysis, don't reprocess
        
    except MessageReleased as e:
        print(f"Message was released: {e}")
        # Message will be redelivered
        
    except MessageAlreadySettled as e:
        print(f"Message already settled: {e}")
        # Continue processing, message already handled
        
    except Exception as e:
        # Unexpected error - release message for retry
        print(f"Unexpected error: {e}")
        try:
            message.release()
        except MessageAlreadySettled:
            pass  # Message already handled

Client Error Handling with Policies

Use error policies for automatic retry behavior.

from uamqp import SendClient
from uamqp.errors import ErrorPolicy, ErrorAction

def create_resilient_client(target, auth):
    # Define error actions for different scenarios
    retry_action = ErrorAction(retry=True, backoff=2.0)
    no_retry_action = ErrorAction(retry=False)
    
    # Create custom error policy
    error_policy = ErrorPolicy(max_retries=3)
    
    # Override specific error handling
    def custom_connection_error_handler(error):
        if "authentication" in str(error).lower():
            return no_retry_action  # Don't retry auth errors
        return retry_action  # Retry other connection errors
    
    error_policy.on_connection_error = custom_connection_error_handler
    
    return SendClient(target, auth=auth, error_policy=error_policy)

# Usage
try:
    with create_resilient_client(target, auth) as client:
        client.queue_message(message)
        results = client.send_all_messages()
except AMQPError as e:
    print(f"All retry attempts failed: {e}")

Async Error Handling

Handle errors in async operations with proper coroutine error management.

import asyncio
from uamqp.async_ops import SendClientAsync
from uamqp.errors import AMQPConnectionError, MessageSendFailed

async def robust_async_send(target, messages, auth):
    max_retries = 3
    
    for attempt in range(max_retries):
        try:
            async with SendClientAsync(target, auth=auth) as client:
                results = await client.send_message_batch_async(messages)
                print(f"Sent {len(results)} messages successfully")
                return results
                
        except AMQPConnectionError as e:
            print(f"Connection error (attempt {attempt + 1}): {e}")
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # Exponential backoff
            
        except MessageSendFailed as e:
            print(f"Send failed (attempt {attempt + 1}): {e}")
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(1)

# Usage in async context
async def main():
    try:
        results = await robust_async_send(target, messages, auth)
    except AMQPError as e:
        print(f"Failed after all retries: {e}")

asyncio.run(main())

Context Manager Error Handling

Proper resource cleanup with error handling in context managers.

from uamqp import ReceiveClient
from uamqp.errors import AMQPError

class RobustReceiveClient:
    def __init__(self, source, auth, **kwargs):
        self.source = source
        self.auth = auth
        self.client_kwargs = kwargs
        self.client = None
    
    def __enter__(self):
        try:
            self.client = ReceiveClient(self.source, auth=self.auth, **self.client_kwargs)
            self.client.open()
            return self.client
        except AMQPError:
            if self.client:
                try:
                    self.client.close()
                except:
                    pass  # Ignore cleanup errors
            raise
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.client:
            try:
                self.client.close()
            except AMQPError as e:
                print(f"Error during cleanup: {e}")
                # Don't re-raise cleanup errors
        
        # Handle different exception types
        if exc_type == AMQPConnectionError:
            print("Connection error occurred, consider retry")
            return False  # Re-raise the exception
        elif exc_type == AuthenticationException:
            print("Authentication failed, check credentials")
            return False
        
        return False  # Don't suppress other exceptions

# Usage
try:
    with RobustReceiveClient(source, auth) as client:
        messages = client.receive_message_batch(timeout=30000)
        for message in messages:
            process_message_safely(message)
except AMQPError as e:
    print(f"Client operation failed: {e}")

Debugging and Diagnostics

Enable Debug Logging

Use debug mode to get detailed protocol-level information for troubleshooting.

import logging

# Enable uAMQP debug logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('uamqp')
logger.setLevel(logging.DEBUG)

# Use debug mode in clients
client = SendClient(target, auth=auth, debug=True)

Error Information Extraction

Extract detailed error information for analysis and logging.

def log_error_details(error):
    print(f"Error type: {type(error).__name__}")
    print(f"Error message: {str(error)}")
    
    # Check for AMQP-specific error details
    if hasattr(error, 'condition'):
        print(f"AMQP condition: {error.condition}")
    if hasattr(error, 'description'):
        print(f"AMQP description: {error.description}")
    if hasattr(error, 'info'):
        print(f"AMQP info: {error.info}")

try:
    # AMQP operation
    pass
except AMQPError as e:
    log_error_details(e)
    # Decide on recovery action based on error details

Health Monitoring

Implement health checks and monitoring for AMQP connections.

import time
from uamqp.errors import AMQPError

class ConnectionHealthMonitor:
    def __init__(self, target, auth):
        self.target = target
        self.auth = auth
        self.last_error = None
        self.error_count = 0
        
    def check_health(self):
        try:
            # Simple health check using high-level API
            from uamqp import send_message, Message
            test_message = Message("health-check")
            
            result = send_message(self.target, test_message, auth=self.auth)
            
            # Reset error tracking on success
            self.last_error = None
            self.error_count = 0
            return True
            
        except AMQPError as e:
            self.last_error = e
            self.error_count += 1
            return False
    
    def get_health_status(self):
        return {
            'healthy': self.last_error is None,
            'last_error': str(self.last_error) if self.last_error else None,
            'error_count': self.error_count,
            'last_check': time.time()
        }

# Usage
monitor = ConnectionHealthMonitor(target, auth)
if not monitor.check_health():
    status = monitor.get_health_status()
    print(f"Connection unhealthy: {status}")

Install with Tessl CLI

npx tessl i tessl/pypi-uamqp

docs

address-endpoints.md

async-operations.md

authentication.md

client-apis.md

connection-session.md

error-handling.md

high-level-messaging.md

index.md

low-level-protocol.md

message-management.md

types-constants.md

tile.json