CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-servicebus

Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications.

Overall
score

92%

Overview
Eval results
Files

exception-handling.mddocs/

Exception Handling

Comprehensive exception hierarchy for handling Service Bus errors including connection issues, authentication failures, message processing errors, and service-side exceptions.

Capabilities

Base Exception

Root exception class for all Service Bus-specific errors.

class ServiceBusError(AzureError):
    """
    Base exception for all Service Bus errors.
    
    Provides common error handling for all Service Bus operations and includes
    retry information and error context.
    """
    def __init__(
        self, 
        message: Optional[Union[str, bytes]], 
        *args, 
        **kwargs
    ) -> None:
        """
        Initialize ServiceBusError.

        Parameters:
        - message: Error message (string or bytes)
        - retryable: Whether the operation can be retried (keyword arg)
        - shutdown_handler: Whether to shutdown handler on error (keyword arg)
        - condition: AMQP error condition (keyword arg)
        - status_code: HTTP status code if applicable (keyword arg)
        """
    
    @property
    def retryable(self) -> bool:
        """Whether this error indicates a retryable condition."""
    
    @property
    def shutdown_handler(self) -> bool:
        """Whether the handler should be shutdown due to this error."""
    
    @property
    def condition(self) -> Optional[str]:
        """AMQP error condition if available."""
    
    @property
    def status_code(self) -> Optional[int]:
        """HTTP status code if applicable."""

Usage Example

from azure.servicebus import ServiceBusClient
from azure.servicebus.exceptions import ServiceBusError

client = ServiceBusClient.from_connection_string("your_connection_string")

try:
    with client.get_queue_sender("my-queue") as sender:
        sender.send_messages(ServiceBusMessage("Hello"))
        
except ServiceBusError as e:
    print(f"Service Bus error: {e}")
    print(f"Retryable: {e.retryable}")
    print(f"Status code: {e.status_code}")
    print(f"Condition: {e.condition}")
    
    if e.retryable:
        # Implement retry logic
        retry_operation()
    else:
        # Handle non-retryable error
        handle_permanent_failure()

Connection and Communication Errors

Exceptions related to network connectivity and communication with Service Bus.

class ServiceBusConnectionError(ServiceBusError):
    """
    An error occurred in the connection to Service Bus.
    
    This is a retryable error that typically indicates network issues
    or temporary service unavailability.
    """

class ServiceBusCommunicationError(ServiceBusError):
    """
    A general communications error when interacting with Service Bus.
    
    Client cannot establish or maintain a connection to Service Bus.
    Check network connectivity, firewall settings, and Service Bus availability.
    """

Usage Example

from azure.servicebus import ServiceBusClient
from azure.servicebus.exceptions import (
    ServiceBusConnectionError,
    ServiceBusCommunicationError
)
import time

client = ServiceBusClient.from_connection_string("your_connection_string")

def send_with_retry(message, max_retries=3):
    for attempt in range(max_retries):
        try:
            with client.get_queue_sender("my-queue") as sender:
                sender.send_messages(message)
            return  # Success
            
        except ServiceBusConnectionError as e:
            print(f"Connection error (attempt {attempt + 1}): {e}")
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # Exponential backoff
            else:
                raise
                
        except ServiceBusCommunicationError as e:
            print(f"Communication error: {e}")
            print("Check network connectivity and firewall settings")
            raise

Authentication and Authorization Errors

Exceptions related to credential validation and access permissions.

class ServiceBusAuthenticationError(ServiceBusError):
    """
    An error occurred when authenticating the connection.
    
    Credential validation failed. Check that credentials are valid
    and not expired.
    """

class ServiceBusAuthorizationError(ServiceBusError):
    """
    An error occurred when authorizing the connection.
    
    Access to the requested resource is denied. Check that the credential
    has appropriate permissions for the operation.
    """

Usage Example

from azure.servicebus import ServiceBusClient
from azure.servicebus.exceptions import (
    ServiceBusAuthenticationError,
    ServiceBusAuthorizationError
)

try:
    client = ServiceBusClient.from_connection_string("invalid_connection_string")
    with client.get_queue_sender("my-queue") as sender:
        sender.send_messages(ServiceBusMessage("Hello"))
        
except ServiceBusAuthenticationError as e:
    print(f"Authentication failed: {e}")
    print("Check connection string and credentials")
    
except ServiceBusAuthorizationError as e:
    print(f"Authorization failed: {e}")
    print("Check that the credential has Send permissions for the queue")

Message Processing Errors

Exceptions related to message handling and settlement operations.

class MessageAlreadySettled(ValueError):
    """
    Failed to settle the message because it was already settled.
    
    An attempt was made to complete, abandon, defer, or dead-letter a message
    that has already been settled. This is not a retryable error.
    """

class MessageLockLostError(ServiceBusError):
    """
    The lock on the message has expired.
    
    The message lock has expired and the message is no longer exclusively
    locked to this receiver. The message will become available to other
    receivers. This is not a retryable error for the same message instance.
    """

class MessageNotFoundError(ServiceBusError):
    """
    The requested message was not found.
    
    Attempt to receive a message with a specific sequence number failed
    because the message no longer exists or has been processed.
    """

class MessageSizeExceededError(ServiceBusError, ValueError):
    """
    Message content is larger than the Service Bus frame size limit.
    
    The message exceeds the maximum allowed size. Reduce message size
    or split into multiple messages.
    """

Usage Example

from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.servicebus.exceptions import (
    MessageAlreadySettled,
    MessageLockLostError,
    MessageNotFoundError,
    MessageSizeExceededError
)
import time

client = ServiceBusClient.from_connection_string("your_connection_string")

def process_messages_safely():
    with client.get_queue_receiver("my-queue") as receiver:
        messages = receiver.receive_messages(max_message_count=10)
        
        for message in messages:
            try:
                # Simulate long processing
                result = process_message(message)
                
                # Attempt to complete the message
                receiver.complete_message(message)
                print(f"Successfully processed: {result}")
                
            except MessageAlreadySettled:
                print("Message was already settled by another process")
                continue
                
            except MessageLockLostError:
                print(f"Message lock expired during processing: {message.sequence_number}")
                # Message will be available for other receivers
                continue
                
            except Exception as processing_error:
                print(f"Error processing message: {processing_error}")
                try:
                    receiver.abandon_message(message)
                except MessageAlreadySettled:
                    print("Message was already settled during error handling")

def send_large_message():
    try:
        # Create a very large message
        large_content = "x" * (1024 * 1024 * 2)  # 2MB message
        message = ServiceBusMessage(large_content)
        
        with client.get_queue_sender("my-queue") as sender:
            sender.send_messages(message)
            
    except MessageSizeExceededError as e:
        print(f"Message too large: {e}")
        print("Consider using message batching or splitting the message")

Session-Related Errors

Exceptions specific to session-enabled messaging scenarios.

class SessionLockLostError(ServiceBusError):
    """
    The lock on the session has expired.
    
    All unsettled messages that have been received can no longer be settled.
    The session must be re-acquired to continue processing.
    """

class SessionCannotBeLockedError(ServiceBusError):
    """
    The requested session cannot be locked.
    
    The session is currently locked by another client, does not exist,
    or the session ID is invalid.
    """

Usage Example

from azure.servicebus import ServiceBusClient, NEXT_AVAILABLE_SESSION
from azure.servicebus.exceptions import (
    SessionLockLostError,
    SessionCannotBeLockedError
)

client = ServiceBusClient.from_connection_string("your_connection_string")

def process_session_safely(session_id):
    try:
        with client.get_queue_receiver("my-session-queue", session_id=session_id) as receiver:
            session = receiver.session
            
            messages = receiver.receive_messages(max_message_count=10)
            for message in messages:
                try:
                    # Process message with session state
                    process_with_session_state(message, session)
                    receiver.complete_message(message)
                    
                except SessionLockLostError:
                    print(f"Session {session_id} lock expired")
                    break  # Exit and let session be re-acquired
                    
    except SessionCannotBeLockedError:
        print(f"Cannot lock session {session_id} - trying next available")
        
        # Try to get any available session instead
        try:
            with client.get_queue_receiver("my-session-queue", session_id=NEXT_AVAILABLE_SESSION) as receiver:
                if receiver.session:
                    print(f"Got available session: {receiver.session.session_id}")
                    # Process this session instead
                else:
                    print("No sessions available")
        except SessionCannotBeLockedError:
            print("No sessions available at all")

Entity Management Errors

Exceptions related to Service Bus entity operations.

class MessagingEntityNotFoundError(ServiceBusError):
    """
    A Service Bus resource cannot be found.
    
    The entity (queue, topic, subscription, or rule) associated with the
    operation does not exist or has been deleted.
    """

class MessagingEntityDisabledError(ServiceBusError):
    """
    The messaging entity is disabled.
    
    The entity exists but is disabled and cannot process messages.
    Enable the entity using the Azure portal or management API.
    """

class MessagingEntityAlreadyExistsError(ServiceBusError):
    """
    An entity with the same name already exists.
    
    Cannot create an entity because one with the same name already exists
    in the namespace.
    """

Usage Example

from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.servicebus.exceptions import (
    MessagingEntityNotFoundError,
    MessagingEntityDisabledError
)
from azure.servicebus.management import ServiceBusAdministrationClient
from azure.servicebus.exceptions import MessagingEntityAlreadyExistsError

client = ServiceBusClient.from_connection_string("your_connection_string")
admin_client = ServiceBusAdministrationClient.from_connection_string("your_connection_string")

def safe_queue_operations(queue_name):
    try:
        # Try to send to queue
        with client.get_queue_sender(queue_name) as sender:
            sender.send_messages(ServiceBusMessage("Hello"))
            
    except MessagingEntityNotFoundError:
        print(f"Queue {queue_name} does not exist - creating it")
        try:
            admin_client.create_queue(queue_name)
            print(f"Created queue {queue_name}")
            # Retry the send operation
            with client.get_queue_sender(queue_name) as sender:
                sender.send_messages(ServiceBusMessage("Hello"))
        except MessagingEntityAlreadyExistsError:
            print(f"Queue {queue_name} was created by another process")
            
    except MessagingEntityDisabledError:
        print(f"Queue {queue_name} is disabled - check Azure portal")

Service-Level Errors

Exceptions related to Service Bus service conditions and limits.

class ServiceBusQuotaExceededError(ServiceBusError):
    """
    A Service Bus resource quota has been exceeded.
    
    The messaging entity has reached its maximum size, or the maximum
    number of connections to a namespace has been exceeded.
    """

class ServiceBusServerBusyError(ServiceBusError):
    """
    The Service Bus service reports that it is busy.
    
    The service cannot process the request at this time. Client should
    wait and retry the operation after a delay.
    """

class OperationTimeoutError(ServiceBusError):
    """
    The operation timed out.
    
    The operation did not complete within the specified timeout period.
    This is typically a retryable error.
    """

Usage Example

from azure.servicebus import ServiceBusClient
from azure.servicebus.exceptions import (
    ServiceBusQuotaExceededError,
    ServiceBusServerBusyError,
    OperationTimeoutError
)
import time
import random

client = ServiceBusClient.from_connection_string("your_connection_string")

def send_with_service_error_handling(message):
    max_retries = 5
    base_delay = 1
    
    for attempt in range(max_retries):
        try:
            with client.get_queue_sender("my-queue", timeout=30) as sender:
                sender.send_messages(message)
            return  # Success
            
        except ServiceBusQuotaExceededError as e:
            print(f"Quota exceeded: {e}")
            print("Cannot send more messages - queue/namespace is full")
            raise  # Don't retry quota errors
            
        except ServiceBusServerBusyError as e:
            print(f"Service busy (attempt {attempt + 1}): {e}")
            if attempt < max_retries - 1:
                # Exponential backoff with jitter
                delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
                time.sleep(delay)
            else:
                raise
                
        except OperationTimeoutError as e:
            print(f"Operation timeout (attempt {attempt + 1}): {e}")
            if attempt < max_retries - 1:
                time.sleep(base_delay * (attempt + 1))
            else:
                raise

Lock Renewal Errors

Exceptions related to automatic lock renewal operations.

class AutoLockRenewFailed(ServiceBusError):
    """
    An attempt to renew a lock on a message or session in the background has failed.
    
    The auto-renewal process encountered an error. Check the specific error
    details to determine if manual intervention is required.
    """

class AutoLockRenewTimeout(ServiceBusError):
    """
    The time allocated to renew the message or session lock has elapsed.
    
    The maximum renewal duration has been reached and no further renewal
    attempts will be made.
    """

Usage Example

from azure.servicebus import ServiceBusClient, AutoLockRenewer
from azure.servicebus.exceptions import (
    AutoLockRenewFailed,
    AutoLockRenewTimeout
)

def handle_auto_renew_errors(renewable, error):
    """Callback for auto-renewal failures."""
    if isinstance(error, AutoLockRenewFailed):
        print(f"Auto-renewal failed for {renewable}: {error}")
        # Log the error and potentially implement fallback logic
        
    elif isinstance(error, AutoLockRenewTimeout):
        print(f"Auto-renewal timeout for {renewable}: {error}")
        # The renewal period has ended - message/session lock will expire
        
    else:
        print(f"Unexpected auto-renewal error: {error}")

client = ServiceBusClient.from_connection_string("your_connection_string")
auto_renewer = AutoLockRenewer(
    max_lock_renewal_duration=300,  # 5 minutes
    on_lock_renew_failure=handle_auto_renew_errors
)

try:
    with client.get_queue_receiver("my-queue") as receiver:
        messages = receiver.receive_messages(max_message_count=5)
        
        for message in messages:
            # Register for auto-renewal
            auto_renewer.register(message, timeout=300)
            
            try:
                # Long-running processing
                result = long_running_process(message)
                receiver.complete_message(message)
                
            except Exception as e:
                print(f"Processing error: {e}")
                receiver.abandon_message(message)
                
finally:
    auto_renewer.close()

Error Handling Best Practices

Comprehensive Error Handling Pattern

from azure.servicebus import ServiceBusClient, ServiceBusMessage, AutoLockRenewer
from azure.servicebus.exceptions import *
from azure.servicebus.management import ServiceBusAdministrationClient
import logging
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def robust_message_processing():
    client = ServiceBusClient.from_connection_string("your_connection_string")
    
    try:
        with client.get_queue_receiver("my-queue") as receiver:
            while True:
                try:
                    messages = receiver.receive_messages(
                        max_message_count=10, 
                        max_wait_time=30
                    )
                    
                    if not messages:
                        logger.info("No messages received")
                        continue
                    
                    for message in messages:
                        try:
                            # Process individual message
                            process_single_message(receiver, message)
                            
                        except MessageAlreadySettled:
                            logger.warning(f"Message {message.sequence_number} already settled")
                            continue
                            
                        except MessageLockLostError:
                            logger.warning(f"Lock lost for message {message.sequence_number}")
                            continue
                            
                except OperationTimeoutError:
                    logger.info("Receive operation timed out - continuing")
                    continue
                    
                except ServiceBusServerBusyError:
                    logger.warning("Service busy - backing off")
                    time.sleep(30)
                    continue
                    
                except MessagingEntityNotFoundError:
                    logger.error("Queue not found - exiting")
                    break
                    
                except (ServiceBusConnectionError, ServiceBusCommunicationError) as e:
                    logger.error(f"Connection issue: {e} - retrying")
                    time.sleep(5)
                    continue
                    
    except ServiceBusAuthenticationError:
        logger.error("Authentication failed - check credentials")
        raise
        
    except ServiceBusAuthorizationError:
        logger.error("Authorization failed - check permissions")
        raise
        
    finally:
        client.close()

def process_single_message(receiver, message):
    """Process a single message with proper error handling."""
    try:
        # Your message processing logic here
        result = your_message_processor(message)
        receiver.complete_message(message)
        logger.info(f"Successfully processed message {message.sequence_number}")
        
    except Exception as processing_error:
        logger.error(f"Error processing message {message.sequence_number}: {processing_error}")
        
        # Decide whether to abandon or dead-letter based on error type and delivery count
        if message.delivery_count and message.delivery_count >= 3:
            receiver.dead_letter_message(
                message,
                reason="ProcessingFailed",
                error_description=str(processing_error)
            )
            logger.warning(f"Dead-lettered message {message.sequence_number} after {message.delivery_count} attempts")
        else:
            receiver.abandon_message(message)
            logger.info(f"Abandoned message {message.sequence_number} for retry")

This comprehensive error handling pattern demonstrates how to handle all major exception types that can occur during Service Bus operations, with appropriate retry logic and logging.

Install with Tessl CLI

npx tessl i tessl/pypi-azure-servicebus

docs

administrative-operations.md

client-management.md

constants-enums.md

exception-handling.md

index.md

message-operations.md

message-types.md

session-management.md

tile.json