or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

administrative-operations.mdclient-management.mdconstants-enums.mdexception-handling.mdindex.mdmessage-operations.mdmessage-types.mdsession-management.md
tile.json

tessl/pypi-azure-servicebus

Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/azure-servicebus@7.14.x

To install, run

npx @tessl/cli install tessl/pypi-azure-servicebus@7.14.0

index.mddocs/

Azure Service Bus

Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications. This library enables building distributed applications with reliable message delivery, publish-subscribe patterns, and complex messaging workflows using queues, topics, and subscriptions with advanced features like message sessions, dead letter queues, scheduled messages, and transaction support.

Package Information

  • Package Name: azure-servicebus
  • Language: Python
  • Installation: pip install azure-servicebus

Core Imports

from azure.servicebus import ServiceBusClient, ServiceBusMessage

For asynchronous operations:

from azure.servicebus.aio import ServiceBusClient, ServiceBusSender, ServiceBusReceiver

For management operations:

from azure.servicebus.management import ServiceBusAdministrationClient
from azure.servicebus.aio.management import ServiceBusAdministrationClient

For exception handling:

from azure.servicebus.exceptions import (
    ServiceBusError,
    ServiceBusConnectionError,
    MessageLockLostError,
    SessionLockLostError
)

For utility functions:

from azure.servicebus import (
    parse_connection_string,
    ServiceBusConnectionStringProperties,
    AutoLockRenewer
)

Basic Usage

from azure.servicebus import ServiceBusClient, ServiceBusMessage

# Create client from connection string
client = ServiceBusClient.from_connection_string("your_connection_string")

# Send a message to a queue
with client.get_queue_sender("queue_name") as sender:
    message = ServiceBusMessage("Hello, Service Bus!")
    sender.send_messages(message)

# Receive messages from a queue
with client.get_queue_receiver("queue_name") as receiver:
    messages = receiver.receive_messages(max_message_count=10, max_wait_time=5)
    for message in messages:
        print(f"Received: {message}")
        receiver.complete_message(message)

client.close()

Architecture

Azure Service Bus provides a comprehensive messaging platform with the following key components:

  • ServiceBusClient: Main entry point for creating senders and receivers
  • ServiceBusSender: Sends messages to queues or topics with batching and scheduling capabilities
  • ServiceBusReceiver: Receives and processes messages with multiple settlement options
  • ServiceBusSession: Manages stateful messaging sessions for ordered processing
  • ServiceBusAdministrationClient: Administrative operations for creating and managing entities

The library supports both synchronous and asynchronous patterns, automatic connection management, retry policies, and built-in error handling, making it suitable for building cloud-native and hybrid distributed applications.

Capabilities

Client Management

Core client functionality for establishing connections and creating messaging components. Provides connection string parsing, credential management, and factory methods for senders and receivers.

class ServiceBusClient:
    def __init__(self, fully_qualified_namespace: str, credential, **kwargs): ...
    @classmethod
    def from_connection_string(cls, conn_str: str, **kwargs) -> 'ServiceBusClient': ...
    def get_queue_sender(self, queue_name: str, **kwargs) -> 'ServiceBusSender': ...
    def get_queue_receiver(self, queue_name: str, **kwargs) -> 'ServiceBusReceiver': ...
    def get_topic_sender(self, topic_name: str, **kwargs) -> 'ServiceBusSender': ...
    def get_subscription_receiver(self, topic_name: str, subscription_name: str, **kwargs) -> 'ServiceBusReceiver': ...
    def close(self) -> None: ...

Client Management

Message Operations

Comprehensive message sending and receiving operations including batch processing, message scheduling, and various settlement patterns for reliable message processing.

class ServiceBusSender:
    def send_messages(self, message, **kwargs) -> None: ...
    def schedule_messages(self, messages, schedule_time_utc, **kwargs) -> List[int]: ...
    def cancel_scheduled_messages(self, sequence_numbers, **kwargs) -> None: ...
    def create_message_batch(self, max_size_in_bytes=None) -> 'ServiceBusMessageBatch': ...

class ServiceBusReceiver:
    def receive_messages(self, max_message_count=1, max_wait_time=None) -> List['ServiceBusReceivedMessage']: ...
    def peek_messages(self, max_message_count=1, **kwargs) -> List['ServiceBusReceivedMessage']: ...
    def complete_message(self, message: 'ServiceBusReceivedMessage') -> None: ...
    def abandon_message(self, message: 'ServiceBusReceivedMessage') -> None: ...
    def defer_message(self, message: 'ServiceBusReceivedMessage') -> None: ...
    def dead_letter_message(self, message: 'ServiceBusReceivedMessage', reason=None, error_description=None) -> None: ...

Message Operations

Session Management

Stateful messaging through sessions enabling ordered processing, session state management, and advanced messaging patterns for applications requiring message correlation and ordered delivery.

class ServiceBusSession:
    @property
    def session_id(self) -> str: ...
    @property
    def locked_until_utc(self) -> Optional[datetime]: ...
    def get_state(self, **kwargs) -> bytes: ...
    def set_state(self, state, **kwargs) -> None: ...
    def renew_lock(self, **kwargs) -> datetime: ...

Session Management

Administrative Operations

Complete Service Bus entity management including queues, topics, subscriptions, and rules. Supports creation, configuration, monitoring, and deletion of messaging entities.

class ServiceBusAdministrationClient:
    def __init__(self, fully_qualified_namespace: str, credential, **kwargs): ...
    @classmethod
    def from_connection_string(cls, conn_str: str, **kwargs) -> 'ServiceBusAdministrationClient': ...
    
    # Queue Management
    def create_queue(self, queue_name: str, **kwargs) -> 'QueueProperties': ...
    def get_queue(self, queue_name: str, **kwargs) -> 'QueueProperties': ...
    def delete_queue(self, queue_name: str, **kwargs) -> None: ...
    def list_queues(self, **kwargs): ...
    
    # Topic Management
    def create_topic(self, topic_name: str, **kwargs) -> 'TopicProperties': ...
    def get_topic(self, topic_name: str, **kwargs) -> 'TopicProperties': ...
    def delete_topic(self, topic_name: str, **kwargs) -> None: ...
    
    # Subscription Management
    def create_subscription(self, topic_name: str, subscription_name: str, **kwargs) -> 'SubscriptionProperties': ...
    def delete_subscription(self, topic_name: str, subscription_name: str, **kwargs) -> None: ...

Administrative Operations

Message Classes and Types

Message data structures, properties, and utility classes for creating, configuring, and processing Service Bus messages with full metadata support.

class ServiceBusMessage:
    def __init__(self, body, **kwargs): ...
    @property
    def body(self): ...
    @property
    def application_properties(self) -> Optional[Dict]: ...
    @property
    def session_id(self) -> Optional[str]: ...
    @property
    def message_id(self) -> Optional[str]: ...
    @property
    def scheduled_enqueue_time_utc(self) -> Optional[datetime]: ...

class ServiceBusReceivedMessage(ServiceBusMessage):
    @property
    def delivery_count(self) -> Optional[int]: ...
    @property
    def enqueued_time_utc(self) -> Optional[datetime]: ...
    @property
    def sequence_number(self) -> Optional[int]: ...
    @property
    def lock_token(self) -> Optional[Union[uuid.UUID, str]]: ...
    @property
    def locked_until_utc(self) -> Optional[datetime]: ...

class ServiceBusMessageBatch:
    @property
    def max_size_in_bytes(self) -> int: ...
    @property
    def size_in_bytes(self) -> int: ...
    def add_message(self, message) -> None: ...

Message Classes and Types

Constants and Enums

Enumeration values and constants for configuring Service Bus behavior including receive modes, message states, transport types, and entity sub-queues.

class ServiceBusReceiveMode(str, Enum):
    PEEK_LOCK = "peeklock"
    RECEIVE_AND_DELETE = "receiveanddelete"

class ServiceBusMessageState(int, Enum):
    ACTIVE = 0
    DEFERRED = 1
    SCHEDULED = 2

class ServiceBusSubQueue(str, Enum):
    DEAD_LETTER = "deadletter"
    TRANSFER_DEAD_LETTER = "transferdeadletter"

class TransportType(Enum):
    Amqp = "Amqp"
    AmqpOverWebsocket = "AmqpOverWebsocket"

NEXT_AVAILABLE_SESSION: ServiceBusSessionFilter

Constants and Enums

Lock Renewal

Automatic lock renewal for messages and sessions to prevent lock expiration during long processing operations. Essential for production scenarios requiring extended message processing time.

class AutoLockRenewer:
    def __init__(
        self,
        max_lock_renewal_duration: float = 300,
        on_lock_renew_failure: Optional[LockRenewFailureCallback] = None,
        executor: Optional[ThreadPoolExecutor] = None,
        max_workers: Optional[int] = None
    ): ...
    
    def register(
        self,
        receiver: ServiceBusReceiver,
        renewable: Union[ServiceBusReceivedMessage, ServiceBusSession],
        max_lock_renewal_duration: Optional[float] = None
    ) -> None: ...
    
    def close(self, wait: bool = True) -> None: ...
    def __enter__(self) -> 'AutoLockRenewer': ...
    def __exit__(self, *args) -> None: ...

Exception Handling

Comprehensive exception hierarchy for handling Service Bus errors including connection issues, authentication failures, message processing errors, and service-side exceptions.

class ServiceBusError(AzureError):
    def __init__(self, message, **kwargs): ...

# Connection and Communication Errors
class ServiceBusConnectionError(ServiceBusError): ...
class ServiceBusCommunicationError(ServiceBusError): ...
class ServiceBusAuthenticationError(ServiceBusError): ...
class ServiceBusAuthorizationError(ServiceBusError): ...

# Message Processing Errors
class MessageAlreadySettled(ValueError): ...
class MessageLockLostError(ServiceBusError): ...
class MessageNotFoundError(ServiceBusError): ...
class MessageSizeExceededError(ServiceBusError, ValueError): ...

# Session Errors
class SessionLockLostError(ServiceBusError): ...
class SessionCannotBeLockedError(ServiceBusError): ...

# Auto Lock Renewal Errors
class AutoLockRenewFailed(ServiceBusError): ...
class AutoLockRenewTimeout(ServiceBusError): ...

# Service Errors
class MessagingEntityNotFoundError(ServiceBusError): ...
class MessagingEntityDisabledError(ServiceBusError): ...
class ServiceBusQuotaExceededError(ServiceBusError): ...
class ServiceBusServerBusyError(ServiceBusError): ...
class OperationTimeoutError(ServiceBusError): ...

Exception Handling

Types

# Type aliases for common parameter types
MessageTypes = Union[ServiceBusMessage, AmqpAnnotatedMessage, List[Union[ServiceBusMessage, AmqpAnnotatedMessage]]]
PrimitiveTypes = Union[int, float, str, bool, bytes, None]
NextAvailableSessionType = ServiceBusSessionFilter
LockRenewFailureCallback = Callable[[Union[ServiceBusSession, ServiceBusReceivedMessage], Optional[Exception]], None]

# Credential types (from azure-core and azure-identity)
TokenCredential = Any  # azure.core.credentials.TokenCredential
AzureSasCredential = Any  # azure.core.credentials.AzureSasCredential 
AzureNamedKeyCredential = Any  # azure.core.credentials.AzureNamedKeyCredential

# Utility functions
def parse_connection_string(conn_str: str) -> ServiceBusConnectionStringProperties: ...

# Connection string properties
class ServiceBusConnectionStringProperties:
    @property
    def endpoint(self) -> str: ...
    @property
    def entity_path(self) -> Optional[str]: ...
    @property
    def shared_access_key_name(self) -> Optional[str]: ...
    @property
    def shared_access_key(self) -> Optional[str]: ...
    @property
    def shared_access_signature(self) -> Optional[str]: ...