Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications.
Overall
score
92%
Core client functionality for establishing connections to Azure Service Bus and creating messaging components. Provides connection string parsing, credential management, and factory methods for senders and receivers.
Creates the main Service Bus client with connection credentials and configuration options.
class ServiceBusClient:
def __init__(
self,
fully_qualified_namespace: str,
credential: Union[TokenCredential, AzureSasCredential, AzureNamedKeyCredential],
*,
retry_total: int = 3,
retry_backoff_factor: float = 0.8,
retry_backoff_max: float = 120,
retry_mode: str = "exponential",
**kwargs
):
"""
Create a Service Bus client.
Parameters:
- fully_qualified_namespace: The fully qualified Service Bus namespace URL
- credential: Authentication credential
- retry_total: Maximum number of retry attempts
- retry_backoff_factor: Backoff factor for retry delays
- retry_backoff_max: Maximum backoff time in seconds
- retry_mode: Retry strategy ("exponential" or "fixed")
"""from azure.servicebus import ServiceBusClient
from azure.identity import DefaultAzureCredential
# Using Azure Active Directory credential
credential = DefaultAzureCredential()
client = ServiceBusClient(
fully_qualified_namespace="myservicebus.servicebus.windows.net",
credential=credential
)
# Using connection string (easier for development)
client = ServiceBusClient.from_connection_string(
"Endpoint=sb://myservicebus.servicebus.windows.net/;SharedAccessKeyName=..."
)Creates a ServiceBusClient instance from a connection string with automatic credential parsing.
@classmethod
def from_connection_string(
cls,
conn_str: str,
**kwargs
) -> ServiceBusClient:
"""
Create a Service Bus client from a connection string.
Parameters:
- conn_str: Service Bus connection string
- **kwargs: Additional configuration options
Returns:
ServiceBusClient instance
"""Access to client configuration and connection information.
@property
def fully_qualified_namespace(self) -> str:
"""The fully qualified namespace URL of the Service Bus."""Create message senders for queues and topics with configuration options.
def get_queue_sender(
self,
queue_name: str,
*,
client_identifier: Optional[str] = None,
socket_timeout: Optional[float] = None,
**kwargs
) -> ServiceBusSender:
"""
Create a sender for a specific queue.
Parameters:
- queue_name: Name of the target queue
- client_identifier: Optional identifier for the client connection
- socket_timeout: Socket timeout in seconds
Returns:
ServiceBusSender instance for the specified queue
"""
def get_topic_sender(
self,
topic_name: str,
*,
client_identifier: Optional[str] = None,
socket_timeout: Optional[float] = None,
**kwargs
) -> ServiceBusSender:
"""
Create a sender for a specific topic.
Parameters:
- topic_name: Name of the target topic
- client_identifier: Optional identifier for the client connection
- socket_timeout: Socket timeout in seconds
Returns:
ServiceBusSender instance for the specified topic
"""Create message receivers for queues and subscriptions with extensive configuration options.
def get_queue_receiver(
self,
queue_name: str,
*,
client_identifier: Optional[str] = None,
socket_timeout: Optional[float] = None,
session_id: Optional[Union[str, NextAvailableSessionType]] = None,
sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None,
receive_mode: Union[ServiceBusReceiveMode, str] = ServiceBusReceiveMode.PEEK_LOCK,
max_wait_time: Optional[float] = None,
auto_lock_renewer: Optional[AutoLockRenewer] = None,
prefetch_count: int = 0,
**kwargs
) -> ServiceBusReceiver:
"""
Create a receiver for a specific queue.
Parameters:
- queue_name: Name of the source queue
- client_identifier: Optional identifier for the client connection
- socket_timeout: Socket timeout in seconds
- session_id: Session ID for session-enabled queues, or NEXT_AVAILABLE_SESSION
- sub_queue: Sub-queue type (DEAD_LETTER, TRANSFER_DEAD_LETTER)
- receive_mode: Message receive mode (PEEK_LOCK or RECEIVE_AND_DELETE)
- max_wait_time: Maximum time to wait for messages
- auto_lock_renewer: AutoLockRenewer instance for automatic lock renewal
- prefetch_count: Number of messages to prefetch (0 disables prefetching)
Returns:
ServiceBusReceiver instance for the specified queue
"""
def get_subscription_receiver(
self,
topic_name: str,
subscription_name: str,
*,
client_identifier: Optional[str] = None,
socket_timeout: Optional[float] = None,
session_id: Optional[Union[str, NextAvailableSessionType]] = None,
sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None,
receive_mode: Union[ServiceBusReceiveMode, str] = ServiceBusReceiveMode.PEEK_LOCK,
max_wait_time: Optional[float] = None,
auto_lock_renewer: Optional[AutoLockRenewer] = None,
prefetch_count: int = 0,
**kwargs
) -> ServiceBusReceiver:
"""
Create a receiver for a specific subscription.
Parameters:
- topic_name: Name of the source topic
- subscription_name: Name of the subscription
- client_identifier: Optional identifier for the client connection
- socket_timeout: Socket timeout in seconds
- session_id: Session ID for session-enabled subscriptions, or NEXT_AVAILABLE_SESSION
- sub_queue: Sub-queue type (DEAD_LETTER, TRANSFER_DEAD_LETTER)
- receive_mode: Message receive mode (PEEK_LOCK or RECEIVE_AND_DELETE)
- max_wait_time: Maximum time to wait for messages
- auto_lock_renewer: AutoLockRenewer instance for automatic lock renewal
- prefetch_count: Number of messages to prefetch (0 disables prefetching)
Returns:
ServiceBusReceiver instance for the specified subscription
"""from azure.servicebus import ServiceBusClient, ServiceBusReceiveMode, NEXT_AVAILABLE_SESSION
client = ServiceBusClient.from_connection_string("your_connection_string")
# Create a basic queue receiver
receiver = client.get_queue_receiver("my-queue")
# Create a session-enabled queue receiver for next available session
session_receiver = client.get_queue_receiver(
"my-session-queue",
session_id=NEXT_AVAILABLE_SESSION
)
# Create a subscription receiver with custom configuration
subscription_receiver = client.get_subscription_receiver(
topic_name="my-topic",
subscription_name="my-subscription",
receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE,
prefetch_count=10
)Properly close client connections and release resources.
def close(self) -> None:
"""
Close the client and release all associated resources.
This should be called when the client is no longer needed to ensure
proper cleanup of network connections and other resources.
"""ServiceBusClient supports context manager protocol for automatic resource cleanup.
def __enter__(self) -> ServiceBusClient: ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...# Automatic resource cleanup with context manager
with ServiceBusClient.from_connection_string("your_connection_string") as client:
with client.get_queue_sender("my-queue") as sender:
sender.send_messages(ServiceBusMessage("Hello"))
with client.get_queue_receiver("my-queue") as receiver:
messages = receiver.receive_messages(max_message_count=5)
for message in messages:
receiver.complete_message(message)
# Client and all senders/receivers are automatically closedParse and work with Service Bus connection strings directly.
def parse_connection_string(conn_str: str) -> ServiceBusConnectionStringProperties:
"""
Parse a Service Bus connection string into its components.
Parameters:
- conn_str: Service Bus connection string
Returns:
ServiceBusConnectionStringProperties with parsed components
"""
class ServiceBusConnectionStringProperties:
"""Parsed components of a Service Bus connection string."""
@property
def endpoint(self) -> str: ...
@property
def entity_path(self) -> Optional[str]: ...
@property
def shared_access_key_name(self) -> Optional[str]: ...
@property
def shared_access_key(self) -> Optional[str]: ...
@property
def shared_access_signature(self) -> Optional[str]: ...For asynchronous operations, use the async version of ServiceBusClient from the azure.servicebus.aio module.
from azure.servicebus.aio import ServiceBusClient
class ServiceBusClient:
# Same methods as sync version but with async/await
async def __aenter__(self) -> ServiceBusClient: ...
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
async def close(self) -> None: ...import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
async def main():
async with ServiceBusClient.from_connection_string("your_connection_string") as client:
async with client.get_queue_sender("my-queue") as sender:
await sender.send_messages(ServiceBusMessage("Hello Async"))
async with client.get_queue_receiver("my-queue") as receiver:
messages = await receiver.receive_messages(max_message_count=5)
for message in messages:
await receiver.complete_message(message)
asyncio.run(main())Install with Tessl CLI
npx tessl i tessl/pypi-azure-servicebusdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10