An AMQP 1.0 client library for Python
Comprehensive error handling with custom exceptions, error policies, and retry mechanisms for robust messaging applications that can gracefully handle network issues, authentication failures, and protocol errors.
Configurable error handling policies that define how clients should respond to different types of errors.
class ErrorPolicy:
def __init__(self, max_retries=3, on_error=None):
"""
Error handling policy configuration.
Parameters:
- max_retries (int): Maximum number of retry attempts
- on_error (ErrorAction): Default error action to take
"""
def on_unrecognized_error(self, error):
"""Handle unrecognized errors."""
def on_message_error(self, error):
"""Handle message-level errors."""
def on_link_error(self, error):
"""Handle link-level errors."""
def on_connection_error(self, error):
"""Handle connection-level errors."""Configuration for specific error handling actions including retry behavior and backoff strategies.
class ErrorAction:
def __init__(self, retry=True, backoff=1.0, increment_retries=True):
"""
Error action specification.
Parameters:
- retry (bool): Whether to retry the operation
- backoff (float): Backoff delay in seconds
- increment_retries (bool): Whether to increment retry counter
"""
@property
def retry: bool
"""Whether to retry the failed operation."""
@property
def backoff: float
"""Backoff delay before retry in seconds."""
@property
def increment_retries: bool
"""Whether to count this as a retry attempt."""Usage Example:
from uamqp.errors import ErrorPolicy, ErrorAction
# Custom error policy with exponential backoff
def create_retry_action(attempt):
backoff_delay = min(2.0 ** attempt, 60.0) # Cap at 60 seconds
return ErrorAction(retry=True, backoff=backoff_delay)
error_policy = ErrorPolicy(
max_retries=5,
on_error=ErrorAction(retry=True, backoff=1.0)
)
# Use with client
from uamqp import SendClient
client = SendClient(target, auth=auth, error_policy=error_policy)Root exception classes for AMQP-related errors.
class AMQPError(Exception):
"""Base exception for all AMQP-related errors."""
class AMQPConnectionError(AMQPError):
"""Connection-related errors including network and protocol issues."""
class AMQPClientShutdown(AMQPError):
"""Exception raised when client is shutting down."""
class MessageHandlerError(AMQPError):
"""Errors in message handling and processing."""Errors related to AMQP connection management and lifecycle.
class ConnectionClose(AMQPConnectionError):
"""Connection was closed by the broker."""
class VendorConnectionClose(ConnectionClose):
"""Vendor-specific connection closure."""Errors related to AMQP links (message senders and receivers).
class LinkDetach(AMQPError):
"""Link was detached by remote peer."""
class VendorLinkDetach(LinkDetach):
"""Vendor-specific link detachment."""
class LinkRedirect(AMQPError):
"""Link redirected to different endpoint."""Errors related to authentication and authorization.
class AuthenticationException(AMQPError):
"""General authentication failure."""
class TokenExpired(AuthenticationException):
"""Authentication token has expired."""
class TokenAuthFailure(AuthenticationException):
"""Token authentication specifically failed."""Errors related to message processing and state management.
class MessageResponse(AMQPError):
"""Base class for message response errors."""
class MessageException(MessageResponse):
"""General message processing error."""
class MessageSendFailed(MessageException):
"""Message sending operation failed."""
class MessageContentTooLarge(MessageException):
"""Message exceeds maximum size limit."""Specific responses to message settlement that may be treated as exceptions.
class MessageAlreadySettled(MessageResponse):
"""Attempt to settle already-settled message."""
class MessageAccepted(MessageResponse):
"""Message was accepted by receiver."""
class MessageRejected(MessageResponse):
"""Message was rejected by receiver."""
class MessageReleased(MessageResponse):
"""Message was released by receiver."""
class MessageModified(MessageResponse):
"""Message was modified by receiver."""Errors related to operation timeouts.
class ClientTimeout(AMQPError):
"""Client operation timed out."""Handle common AMQP errors with appropriate recovery strategies.
from uamqp import send_message, receive_message
from uamqp.errors import (
AMQPConnectionError,
AuthenticationException,
MessageSendFailed,
ClientTimeout
)
def robust_send_message(target, message, auth):
max_attempts = 3
for attempt in range(max_attempts):
try:
result = send_message(target, message, auth=auth)
print(f"Message sent successfully: {result}")
return result
except AMQPConnectionError as e:
print(f"Connection error (attempt {attempt + 1}): {e}")
if attempt == max_attempts - 1:
raise
time.sleep(2 ** attempt) # Exponential backoff
except AuthenticationException as e:
print(f"Authentication failed: {e}")
# Don't retry auth failures
raise
except MessageSendFailed as e:
print(f"Send failed (attempt {attempt + 1}): {e}")
if attempt == max_attempts - 1:
raise
time.sleep(1)
except ClientTimeout as e:
print(f"Timeout (attempt {attempt + 1}): {e}")
if attempt == max_attempts - 1:
raiseHandle message settlement responses appropriately.
from uamqp.errors import (
MessageRejected,
MessageReleased,
MessageModified,
MessageAlreadySettled
)
def process_message_safely(message):
try:
# Process message content
data = message.get_data()
result = process_business_logic(data)
# Accept message on successful processing
message.accept()
return result
except ValueError as e:
# Business logic error - reject message
print(f"Invalid message data: {e}")
message.reject(
condition="invalid-data",
description=str(e)
)
except MessageRejected as e:
print(f"Message was rejected: {e}")
# Log for analysis, don't reprocess
except MessageReleased as e:
print(f"Message was released: {e}")
# Message will be redelivered
except MessageAlreadySettled as e:
print(f"Message already settled: {e}")
# Continue processing, message already handled
except Exception as e:
# Unexpected error - release message for retry
print(f"Unexpected error: {e}")
try:
message.release()
except MessageAlreadySettled:
pass # Message already handledUse error policies for automatic retry behavior.
from uamqp import SendClient
from uamqp.errors import ErrorPolicy, ErrorAction
def create_resilient_client(target, auth):
# Define error actions for different scenarios
retry_action = ErrorAction(retry=True, backoff=2.0)
no_retry_action = ErrorAction(retry=False)
# Create custom error policy
error_policy = ErrorPolicy(max_retries=3)
# Override specific error handling
def custom_connection_error_handler(error):
if "authentication" in str(error).lower():
return no_retry_action # Don't retry auth errors
return retry_action # Retry other connection errors
error_policy.on_connection_error = custom_connection_error_handler
return SendClient(target, auth=auth, error_policy=error_policy)
# Usage
try:
with create_resilient_client(target, auth) as client:
client.queue_message(message)
results = client.send_all_messages()
except AMQPError as e:
print(f"All retry attempts failed: {e}")Handle errors in async operations with proper coroutine error management.
import asyncio
from uamqp.async_ops import SendClientAsync
from uamqp.errors import AMQPConnectionError, MessageSendFailed
async def robust_async_send(target, messages, auth):
max_retries = 3
for attempt in range(max_retries):
try:
async with SendClientAsync(target, auth=auth) as client:
results = await client.send_message_batch_async(messages)
print(f"Sent {len(results)} messages successfully")
return results
except AMQPConnectionError as e:
print(f"Connection error (attempt {attempt + 1}): {e}")
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff
except MessageSendFailed as e:
print(f"Send failed (attempt {attempt + 1}): {e}")
if attempt == max_retries - 1:
raise
await asyncio.sleep(1)
# Usage in async context
async def main():
try:
results = await robust_async_send(target, messages, auth)
except AMQPError as e:
print(f"Failed after all retries: {e}")
asyncio.run(main())Proper resource cleanup with error handling in context managers.
from uamqp import ReceiveClient
from uamqp.errors import AMQPError
class RobustReceiveClient:
def __init__(self, source, auth, **kwargs):
self.source = source
self.auth = auth
self.client_kwargs = kwargs
self.client = None
def __enter__(self):
try:
self.client = ReceiveClient(self.source, auth=self.auth, **self.client_kwargs)
self.client.open()
return self.client
except AMQPError:
if self.client:
try:
self.client.close()
except:
pass # Ignore cleanup errors
raise
def __exit__(self, exc_type, exc_val, exc_tb):
if self.client:
try:
self.client.close()
except AMQPError as e:
print(f"Error during cleanup: {e}")
# Don't re-raise cleanup errors
# Handle different exception types
if exc_type == AMQPConnectionError:
print("Connection error occurred, consider retry")
return False # Re-raise the exception
elif exc_type == AuthenticationException:
print("Authentication failed, check credentials")
return False
return False # Don't suppress other exceptions
# Usage
try:
with RobustReceiveClient(source, auth) as client:
messages = client.receive_message_batch(timeout=30000)
for message in messages:
process_message_safely(message)
except AMQPError as e:
print(f"Client operation failed: {e}")Use debug mode to get detailed protocol-level information for troubleshooting.
import logging
# Enable uAMQP debug logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('uamqp')
logger.setLevel(logging.DEBUG)
# Use debug mode in clients
client = SendClient(target, auth=auth, debug=True)Extract detailed error information for analysis and logging.
def log_error_details(error):
print(f"Error type: {type(error).__name__}")
print(f"Error message: {str(error)}")
# Check for AMQP-specific error details
if hasattr(error, 'condition'):
print(f"AMQP condition: {error.condition}")
if hasattr(error, 'description'):
print(f"AMQP description: {error.description}")
if hasattr(error, 'info'):
print(f"AMQP info: {error.info}")
try:
# AMQP operation
pass
except AMQPError as e:
log_error_details(e)
# Decide on recovery action based on error detailsImplement health checks and monitoring for AMQP connections.
import time
from uamqp.errors import AMQPError
class ConnectionHealthMonitor:
def __init__(self, target, auth):
self.target = target
self.auth = auth
self.last_error = None
self.error_count = 0
def check_health(self):
try:
# Simple health check using high-level API
from uamqp import send_message, Message
test_message = Message("health-check")
result = send_message(self.target, test_message, auth=self.auth)
# Reset error tracking on success
self.last_error = None
self.error_count = 0
return True
except AMQPError as e:
self.last_error = e
self.error_count += 1
return False
def get_health_status(self):
return {
'healthy': self.last_error is None,
'last_error': str(self.last_error) if self.last_error else None,
'error_count': self.error_count,
'last_check': time.time()
}
# Usage
monitor = ConnectionHealthMonitor(target, auth)
if not monitor.check_health():
status = monitor.get_health_status()
print(f"Connection unhealthy: {status}")Install with Tessl CLI
npx tessl i tessl/pypi-uamqp