Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications.
npx @tessl/cli install tessl/pypi-azure-servicebus@7.14.0Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications. This library enables building distributed applications with reliable message delivery, publish-subscribe patterns, and complex messaging workflows using queues, topics, and subscriptions with advanced features like message sessions, dead letter queues, scheduled messages, and transaction support.
pip install azure-servicebusfrom azure.servicebus import ServiceBusClient, ServiceBusMessageFor asynchronous operations:
from azure.servicebus.aio import ServiceBusClient, ServiceBusSender, ServiceBusReceiverFor management operations:
from azure.servicebus.management import ServiceBusAdministrationClient
from azure.servicebus.aio.management import ServiceBusAdministrationClientFor exception handling:
from azure.servicebus.exceptions import (
ServiceBusError,
ServiceBusConnectionError,
MessageLockLostError,
SessionLockLostError
)For utility functions:
from azure.servicebus import (
parse_connection_string,
ServiceBusConnectionStringProperties,
AutoLockRenewer
)from azure.servicebus import ServiceBusClient, ServiceBusMessage
# Create client from connection string
client = ServiceBusClient.from_connection_string("your_connection_string")
# Send a message to a queue
with client.get_queue_sender("queue_name") as sender:
message = ServiceBusMessage("Hello, Service Bus!")
sender.send_messages(message)
# Receive messages from a queue
with client.get_queue_receiver("queue_name") as receiver:
messages = receiver.receive_messages(max_message_count=10, max_wait_time=5)
for message in messages:
print(f"Received: {message}")
receiver.complete_message(message)
client.close()Azure Service Bus provides a comprehensive messaging platform with the following key components:
The library supports both synchronous and asynchronous patterns, automatic connection management, retry policies, and built-in error handling, making it suitable for building cloud-native and hybrid distributed applications.
Core client functionality for establishing connections and creating messaging components. Provides connection string parsing, credential management, and factory methods for senders and receivers.
class ServiceBusClient:
def __init__(self, fully_qualified_namespace: str, credential, **kwargs): ...
@classmethod
def from_connection_string(cls, conn_str: str, **kwargs) -> 'ServiceBusClient': ...
def get_queue_sender(self, queue_name: str, **kwargs) -> 'ServiceBusSender': ...
def get_queue_receiver(self, queue_name: str, **kwargs) -> 'ServiceBusReceiver': ...
def get_topic_sender(self, topic_name: str, **kwargs) -> 'ServiceBusSender': ...
def get_subscription_receiver(self, topic_name: str, subscription_name: str, **kwargs) -> 'ServiceBusReceiver': ...
def close(self) -> None: ...Comprehensive message sending and receiving operations including batch processing, message scheduling, and various settlement patterns for reliable message processing.
class ServiceBusSender:
def send_messages(self, message, **kwargs) -> None: ...
def schedule_messages(self, messages, schedule_time_utc, **kwargs) -> List[int]: ...
def cancel_scheduled_messages(self, sequence_numbers, **kwargs) -> None: ...
def create_message_batch(self, max_size_in_bytes=None) -> 'ServiceBusMessageBatch': ...
class ServiceBusReceiver:
def receive_messages(self, max_message_count=1, max_wait_time=None) -> List['ServiceBusReceivedMessage']: ...
def peek_messages(self, max_message_count=1, **kwargs) -> List['ServiceBusReceivedMessage']: ...
def complete_message(self, message: 'ServiceBusReceivedMessage') -> None: ...
def abandon_message(self, message: 'ServiceBusReceivedMessage') -> None: ...
def defer_message(self, message: 'ServiceBusReceivedMessage') -> None: ...
def dead_letter_message(self, message: 'ServiceBusReceivedMessage', reason=None, error_description=None) -> None: ...Stateful messaging through sessions enabling ordered processing, session state management, and advanced messaging patterns for applications requiring message correlation and ordered delivery.
class ServiceBusSession:
@property
def session_id(self) -> str: ...
@property
def locked_until_utc(self) -> Optional[datetime]: ...
def get_state(self, **kwargs) -> bytes: ...
def set_state(self, state, **kwargs) -> None: ...
def renew_lock(self, **kwargs) -> datetime: ...Complete Service Bus entity management including queues, topics, subscriptions, and rules. Supports creation, configuration, monitoring, and deletion of messaging entities.
class ServiceBusAdministrationClient:
def __init__(self, fully_qualified_namespace: str, credential, **kwargs): ...
@classmethod
def from_connection_string(cls, conn_str: str, **kwargs) -> 'ServiceBusAdministrationClient': ...
# Queue Management
def create_queue(self, queue_name: str, **kwargs) -> 'QueueProperties': ...
def get_queue(self, queue_name: str, **kwargs) -> 'QueueProperties': ...
def delete_queue(self, queue_name: str, **kwargs) -> None: ...
def list_queues(self, **kwargs): ...
# Topic Management
def create_topic(self, topic_name: str, **kwargs) -> 'TopicProperties': ...
def get_topic(self, topic_name: str, **kwargs) -> 'TopicProperties': ...
def delete_topic(self, topic_name: str, **kwargs) -> None: ...
# Subscription Management
def create_subscription(self, topic_name: str, subscription_name: str, **kwargs) -> 'SubscriptionProperties': ...
def delete_subscription(self, topic_name: str, subscription_name: str, **kwargs) -> None: ...Message data structures, properties, and utility classes for creating, configuring, and processing Service Bus messages with full metadata support.
class ServiceBusMessage:
def __init__(self, body, **kwargs): ...
@property
def body(self): ...
@property
def application_properties(self) -> Optional[Dict]: ...
@property
def session_id(self) -> Optional[str]: ...
@property
def message_id(self) -> Optional[str]: ...
@property
def scheduled_enqueue_time_utc(self) -> Optional[datetime]: ...
class ServiceBusReceivedMessage(ServiceBusMessage):
@property
def delivery_count(self) -> Optional[int]: ...
@property
def enqueued_time_utc(self) -> Optional[datetime]: ...
@property
def sequence_number(self) -> Optional[int]: ...
@property
def lock_token(self) -> Optional[Union[uuid.UUID, str]]: ...
@property
def locked_until_utc(self) -> Optional[datetime]: ...
class ServiceBusMessageBatch:
@property
def max_size_in_bytes(self) -> int: ...
@property
def size_in_bytes(self) -> int: ...
def add_message(self, message) -> None: ...Enumeration values and constants for configuring Service Bus behavior including receive modes, message states, transport types, and entity sub-queues.
class ServiceBusReceiveMode(str, Enum):
PEEK_LOCK = "peeklock"
RECEIVE_AND_DELETE = "receiveanddelete"
class ServiceBusMessageState(int, Enum):
ACTIVE = 0
DEFERRED = 1
SCHEDULED = 2
class ServiceBusSubQueue(str, Enum):
DEAD_LETTER = "deadletter"
TRANSFER_DEAD_LETTER = "transferdeadletter"
class TransportType(Enum):
Amqp = "Amqp"
AmqpOverWebsocket = "AmqpOverWebsocket"
NEXT_AVAILABLE_SESSION: ServiceBusSessionFilterAutomatic lock renewal for messages and sessions to prevent lock expiration during long processing operations. Essential for production scenarios requiring extended message processing time.
class AutoLockRenewer:
def __init__(
self,
max_lock_renewal_duration: float = 300,
on_lock_renew_failure: Optional[LockRenewFailureCallback] = None,
executor: Optional[ThreadPoolExecutor] = None,
max_workers: Optional[int] = None
): ...
def register(
self,
receiver: ServiceBusReceiver,
renewable: Union[ServiceBusReceivedMessage, ServiceBusSession],
max_lock_renewal_duration: Optional[float] = None
) -> None: ...
def close(self, wait: bool = True) -> None: ...
def __enter__(self) -> 'AutoLockRenewer': ...
def __exit__(self, *args) -> None: ...Comprehensive exception hierarchy for handling Service Bus errors including connection issues, authentication failures, message processing errors, and service-side exceptions.
class ServiceBusError(AzureError):
def __init__(self, message, **kwargs): ...
# Connection and Communication Errors
class ServiceBusConnectionError(ServiceBusError): ...
class ServiceBusCommunicationError(ServiceBusError): ...
class ServiceBusAuthenticationError(ServiceBusError): ...
class ServiceBusAuthorizationError(ServiceBusError): ...
# Message Processing Errors
class MessageAlreadySettled(ValueError): ...
class MessageLockLostError(ServiceBusError): ...
class MessageNotFoundError(ServiceBusError): ...
class MessageSizeExceededError(ServiceBusError, ValueError): ...
# Session Errors
class SessionLockLostError(ServiceBusError): ...
class SessionCannotBeLockedError(ServiceBusError): ...
# Auto Lock Renewal Errors
class AutoLockRenewFailed(ServiceBusError): ...
class AutoLockRenewTimeout(ServiceBusError): ...
# Service Errors
class MessagingEntityNotFoundError(ServiceBusError): ...
class MessagingEntityDisabledError(ServiceBusError): ...
class ServiceBusQuotaExceededError(ServiceBusError): ...
class ServiceBusServerBusyError(ServiceBusError): ...
class OperationTimeoutError(ServiceBusError): ...# Type aliases for common parameter types
MessageTypes = Union[ServiceBusMessage, AmqpAnnotatedMessage, List[Union[ServiceBusMessage, AmqpAnnotatedMessage]]]
PrimitiveTypes = Union[int, float, str, bool, bytes, None]
NextAvailableSessionType = ServiceBusSessionFilter
LockRenewFailureCallback = Callable[[Union[ServiceBusSession, ServiceBusReceivedMessage], Optional[Exception]], None]
# Credential types (from azure-core and azure-identity)
TokenCredential = Any # azure.core.credentials.TokenCredential
AzureSasCredential = Any # azure.core.credentials.AzureSasCredential
AzureNamedKeyCredential = Any # azure.core.credentials.AzureNamedKeyCredential
# Utility functions
def parse_connection_string(conn_str: str) -> ServiceBusConnectionStringProperties: ...
# Connection string properties
class ServiceBusConnectionStringProperties:
@property
def endpoint(self) -> str: ...
@property
def entity_path(self) -> Optional[str]: ...
@property
def shared_access_key_name(self) -> Optional[str]: ...
@property
def shared_access_key(self) -> Optional[str]: ...
@property
def shared_access_signature(self) -> Optional[str]: ...