Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications.
Overall
score
92%
Comprehensive exception hierarchy for handling Service Bus errors including connection issues, authentication failures, message processing errors, and service-side exceptions.
Root exception class for all Service Bus-specific errors.
class ServiceBusError(AzureError):
"""
Base exception for all Service Bus errors.
Provides common error handling for all Service Bus operations and includes
retry information and error context.
"""
def __init__(
self,
message: Optional[Union[str, bytes]],
*args,
**kwargs
) -> None:
"""
Initialize ServiceBusError.
Parameters:
- message: Error message (string or bytes)
- retryable: Whether the operation can be retried (keyword arg)
- shutdown_handler: Whether to shutdown handler on error (keyword arg)
- condition: AMQP error condition (keyword arg)
- status_code: HTTP status code if applicable (keyword arg)
"""
@property
def retryable(self) -> bool:
"""Whether this error indicates a retryable condition."""
@property
def shutdown_handler(self) -> bool:
"""Whether the handler should be shutdown due to this error."""
@property
def condition(self) -> Optional[str]:
"""AMQP error condition if available."""
@property
def status_code(self) -> Optional[int]:
"""HTTP status code if applicable."""from azure.servicebus import ServiceBusClient
from azure.servicebus.exceptions import ServiceBusError
client = ServiceBusClient.from_connection_string("your_connection_string")
try:
with client.get_queue_sender("my-queue") as sender:
sender.send_messages(ServiceBusMessage("Hello"))
except ServiceBusError as e:
print(f"Service Bus error: {e}")
print(f"Retryable: {e.retryable}")
print(f"Status code: {e.status_code}")
print(f"Condition: {e.condition}")
if e.retryable:
# Implement retry logic
retry_operation()
else:
# Handle non-retryable error
handle_permanent_failure()Exceptions related to network connectivity and communication with Service Bus.
class ServiceBusConnectionError(ServiceBusError):
"""
An error occurred in the connection to Service Bus.
This is a retryable error that typically indicates network issues
or temporary service unavailability.
"""
class ServiceBusCommunicationError(ServiceBusError):
"""
A general communications error when interacting with Service Bus.
Client cannot establish or maintain a connection to Service Bus.
Check network connectivity, firewall settings, and Service Bus availability.
"""from azure.servicebus import ServiceBusClient
from azure.servicebus.exceptions import (
ServiceBusConnectionError,
ServiceBusCommunicationError
)
import time
client = ServiceBusClient.from_connection_string("your_connection_string")
def send_with_retry(message, max_retries=3):
for attempt in range(max_retries):
try:
with client.get_queue_sender("my-queue") as sender:
sender.send_messages(message)
return # Success
except ServiceBusConnectionError as e:
print(f"Connection error (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
else:
raise
except ServiceBusCommunicationError as e:
print(f"Communication error: {e}")
print("Check network connectivity and firewall settings")
raiseExceptions related to credential validation and access permissions.
class ServiceBusAuthenticationError(ServiceBusError):
"""
An error occurred when authenticating the connection.
Credential validation failed. Check that credentials are valid
and not expired.
"""
class ServiceBusAuthorizationError(ServiceBusError):
"""
An error occurred when authorizing the connection.
Access to the requested resource is denied. Check that the credential
has appropriate permissions for the operation.
"""from azure.servicebus import ServiceBusClient
from azure.servicebus.exceptions import (
ServiceBusAuthenticationError,
ServiceBusAuthorizationError
)
try:
client = ServiceBusClient.from_connection_string("invalid_connection_string")
with client.get_queue_sender("my-queue") as sender:
sender.send_messages(ServiceBusMessage("Hello"))
except ServiceBusAuthenticationError as e:
print(f"Authentication failed: {e}")
print("Check connection string and credentials")
except ServiceBusAuthorizationError as e:
print(f"Authorization failed: {e}")
print("Check that the credential has Send permissions for the queue")Exceptions related to message handling and settlement operations.
class MessageAlreadySettled(ValueError):
"""
Failed to settle the message because it was already settled.
An attempt was made to complete, abandon, defer, or dead-letter a message
that has already been settled. This is not a retryable error.
"""
class MessageLockLostError(ServiceBusError):
"""
The lock on the message has expired.
The message lock has expired and the message is no longer exclusively
locked to this receiver. The message will become available to other
receivers. This is not a retryable error for the same message instance.
"""
class MessageNotFoundError(ServiceBusError):
"""
The requested message was not found.
Attempt to receive a message with a specific sequence number failed
because the message no longer exists or has been processed.
"""
class MessageSizeExceededError(ServiceBusError, ValueError):
"""
Message content is larger than the Service Bus frame size limit.
The message exceeds the maximum allowed size. Reduce message size
or split into multiple messages.
"""from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.servicebus.exceptions import (
MessageAlreadySettled,
MessageLockLostError,
MessageNotFoundError,
MessageSizeExceededError
)
import time
client = ServiceBusClient.from_connection_string("your_connection_string")
def process_messages_safely():
with client.get_queue_receiver("my-queue") as receiver:
messages = receiver.receive_messages(max_message_count=10)
for message in messages:
try:
# Simulate long processing
result = process_message(message)
# Attempt to complete the message
receiver.complete_message(message)
print(f"Successfully processed: {result}")
except MessageAlreadySettled:
print("Message was already settled by another process")
continue
except MessageLockLostError:
print(f"Message lock expired during processing: {message.sequence_number}")
# Message will be available for other receivers
continue
except Exception as processing_error:
print(f"Error processing message: {processing_error}")
try:
receiver.abandon_message(message)
except MessageAlreadySettled:
print("Message was already settled during error handling")
def send_large_message():
try:
# Create a very large message
large_content = "x" * (1024 * 1024 * 2) # 2MB message
message = ServiceBusMessage(large_content)
with client.get_queue_sender("my-queue") as sender:
sender.send_messages(message)
except MessageSizeExceededError as e:
print(f"Message too large: {e}")
print("Consider using message batching or splitting the message")Exceptions specific to session-enabled messaging scenarios.
class SessionLockLostError(ServiceBusError):
"""
The lock on the session has expired.
All unsettled messages that have been received can no longer be settled.
The session must be re-acquired to continue processing.
"""
class SessionCannotBeLockedError(ServiceBusError):
"""
The requested session cannot be locked.
The session is currently locked by another client, does not exist,
or the session ID is invalid.
"""from azure.servicebus import ServiceBusClient, NEXT_AVAILABLE_SESSION
from azure.servicebus.exceptions import (
SessionLockLostError,
SessionCannotBeLockedError
)
client = ServiceBusClient.from_connection_string("your_connection_string")
def process_session_safely(session_id):
try:
with client.get_queue_receiver("my-session-queue", session_id=session_id) as receiver:
session = receiver.session
messages = receiver.receive_messages(max_message_count=10)
for message in messages:
try:
# Process message with session state
process_with_session_state(message, session)
receiver.complete_message(message)
except SessionLockLostError:
print(f"Session {session_id} lock expired")
break # Exit and let session be re-acquired
except SessionCannotBeLockedError:
print(f"Cannot lock session {session_id} - trying next available")
# Try to get any available session instead
try:
with client.get_queue_receiver("my-session-queue", session_id=NEXT_AVAILABLE_SESSION) as receiver:
if receiver.session:
print(f"Got available session: {receiver.session.session_id}")
# Process this session instead
else:
print("No sessions available")
except SessionCannotBeLockedError:
print("No sessions available at all")Exceptions related to Service Bus entity operations.
class MessagingEntityNotFoundError(ServiceBusError):
"""
A Service Bus resource cannot be found.
The entity (queue, topic, subscription, or rule) associated with the
operation does not exist or has been deleted.
"""
class MessagingEntityDisabledError(ServiceBusError):
"""
The messaging entity is disabled.
The entity exists but is disabled and cannot process messages.
Enable the entity using the Azure portal or management API.
"""
class MessagingEntityAlreadyExistsError(ServiceBusError):
"""
An entity with the same name already exists.
Cannot create an entity because one with the same name already exists
in the namespace.
"""from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.servicebus.exceptions import (
MessagingEntityNotFoundError,
MessagingEntityDisabledError
)
from azure.servicebus.management import ServiceBusAdministrationClient
from azure.servicebus.exceptions import MessagingEntityAlreadyExistsError
client = ServiceBusClient.from_connection_string("your_connection_string")
admin_client = ServiceBusAdministrationClient.from_connection_string("your_connection_string")
def safe_queue_operations(queue_name):
try:
# Try to send to queue
with client.get_queue_sender(queue_name) as sender:
sender.send_messages(ServiceBusMessage("Hello"))
except MessagingEntityNotFoundError:
print(f"Queue {queue_name} does not exist - creating it")
try:
admin_client.create_queue(queue_name)
print(f"Created queue {queue_name}")
# Retry the send operation
with client.get_queue_sender(queue_name) as sender:
sender.send_messages(ServiceBusMessage("Hello"))
except MessagingEntityAlreadyExistsError:
print(f"Queue {queue_name} was created by another process")
except MessagingEntityDisabledError:
print(f"Queue {queue_name} is disabled - check Azure portal")Exceptions related to Service Bus service conditions and limits.
class ServiceBusQuotaExceededError(ServiceBusError):
"""
A Service Bus resource quota has been exceeded.
The messaging entity has reached its maximum size, or the maximum
number of connections to a namespace has been exceeded.
"""
class ServiceBusServerBusyError(ServiceBusError):
"""
The Service Bus service reports that it is busy.
The service cannot process the request at this time. Client should
wait and retry the operation after a delay.
"""
class OperationTimeoutError(ServiceBusError):
"""
The operation timed out.
The operation did not complete within the specified timeout period.
This is typically a retryable error.
"""from azure.servicebus import ServiceBusClient
from azure.servicebus.exceptions import (
ServiceBusQuotaExceededError,
ServiceBusServerBusyError,
OperationTimeoutError
)
import time
import random
client = ServiceBusClient.from_connection_string("your_connection_string")
def send_with_service_error_handling(message):
max_retries = 5
base_delay = 1
for attempt in range(max_retries):
try:
with client.get_queue_sender("my-queue", timeout=30) as sender:
sender.send_messages(message)
return # Success
except ServiceBusQuotaExceededError as e:
print(f"Quota exceeded: {e}")
print("Cannot send more messages - queue/namespace is full")
raise # Don't retry quota errors
except ServiceBusServerBusyError as e:
print(f"Service busy (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
# Exponential backoff with jitter
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
time.sleep(delay)
else:
raise
except OperationTimeoutError as e:
print(f"Operation timeout (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(base_delay * (attempt + 1))
else:
raiseExceptions related to automatic lock renewal operations.
class AutoLockRenewFailed(ServiceBusError):
"""
An attempt to renew a lock on a message or session in the background has failed.
The auto-renewal process encountered an error. Check the specific error
details to determine if manual intervention is required.
"""
class AutoLockRenewTimeout(ServiceBusError):
"""
The time allocated to renew the message or session lock has elapsed.
The maximum renewal duration has been reached and no further renewal
attempts will be made.
"""from azure.servicebus import ServiceBusClient, AutoLockRenewer
from azure.servicebus.exceptions import (
AutoLockRenewFailed,
AutoLockRenewTimeout
)
def handle_auto_renew_errors(renewable, error):
"""Callback for auto-renewal failures."""
if isinstance(error, AutoLockRenewFailed):
print(f"Auto-renewal failed for {renewable}: {error}")
# Log the error and potentially implement fallback logic
elif isinstance(error, AutoLockRenewTimeout):
print(f"Auto-renewal timeout for {renewable}: {error}")
# The renewal period has ended - message/session lock will expire
else:
print(f"Unexpected auto-renewal error: {error}")
client = ServiceBusClient.from_connection_string("your_connection_string")
auto_renewer = AutoLockRenewer(
max_lock_renewal_duration=300, # 5 minutes
on_lock_renew_failure=handle_auto_renew_errors
)
try:
with client.get_queue_receiver("my-queue") as receiver:
messages = receiver.receive_messages(max_message_count=5)
for message in messages:
# Register for auto-renewal
auto_renewer.register(message, timeout=300)
try:
# Long-running processing
result = long_running_process(message)
receiver.complete_message(message)
except Exception as e:
print(f"Processing error: {e}")
receiver.abandon_message(message)
finally:
auto_renewer.close()from azure.servicebus import ServiceBusClient, ServiceBusMessage, AutoLockRenewer
from azure.servicebus.exceptions import *
from azure.servicebus.management import ServiceBusAdministrationClient
import logging
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def robust_message_processing():
client = ServiceBusClient.from_connection_string("your_connection_string")
try:
with client.get_queue_receiver("my-queue") as receiver:
while True:
try:
messages = receiver.receive_messages(
max_message_count=10,
max_wait_time=30
)
if not messages:
logger.info("No messages received")
continue
for message in messages:
try:
# Process individual message
process_single_message(receiver, message)
except MessageAlreadySettled:
logger.warning(f"Message {message.sequence_number} already settled")
continue
except MessageLockLostError:
logger.warning(f"Lock lost for message {message.sequence_number}")
continue
except OperationTimeoutError:
logger.info("Receive operation timed out - continuing")
continue
except ServiceBusServerBusyError:
logger.warning("Service busy - backing off")
time.sleep(30)
continue
except MessagingEntityNotFoundError:
logger.error("Queue not found - exiting")
break
except (ServiceBusConnectionError, ServiceBusCommunicationError) as e:
logger.error(f"Connection issue: {e} - retrying")
time.sleep(5)
continue
except ServiceBusAuthenticationError:
logger.error("Authentication failed - check credentials")
raise
except ServiceBusAuthorizationError:
logger.error("Authorization failed - check permissions")
raise
finally:
client.close()
def process_single_message(receiver, message):
"""Process a single message with proper error handling."""
try:
# Your message processing logic here
result = your_message_processor(message)
receiver.complete_message(message)
logger.info(f"Successfully processed message {message.sequence_number}")
except Exception as processing_error:
logger.error(f"Error processing message {message.sequence_number}: {processing_error}")
# Decide whether to abandon or dead-letter based on error type and delivery count
if message.delivery_count and message.delivery_count >= 3:
receiver.dead_letter_message(
message,
reason="ProcessingFailed",
error_description=str(processing_error)
)
logger.warning(f"Dead-lettered message {message.sequence_number} after {message.delivery_count} attempts")
else:
receiver.abandon_message(message)
logger.info(f"Abandoned message {message.sequence_number} for retry")This comprehensive error handling pattern demonstrates how to handle all major exception types that can occur during Service Bus operations, with appropriate retry logic and logging.
Install with Tessl CLI
npx tessl i tessl/pypi-azure-servicebusdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10