CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-servicebus

Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications.

Overall
score

92%

Overview
Eval results
Files

client-management.mddocs/

Client Management

Core client functionality for establishing connections to Azure Service Bus and creating messaging components. Provides connection string parsing, credential management, and factory methods for senders and receivers.

Capabilities

ServiceBusClient Initialization

Creates the main Service Bus client with connection credentials and configuration options.

class ServiceBusClient:
    def __init__(
        self,
        fully_qualified_namespace: str,
        credential: Union[TokenCredential, AzureSasCredential, AzureNamedKeyCredential],
        *,
        retry_total: int = 3,
        retry_backoff_factor: float = 0.8,
        retry_backoff_max: float = 120,
        retry_mode: str = "exponential",
        **kwargs
    ):
        """
        Create a Service Bus client.

        Parameters:
        - fully_qualified_namespace: The fully qualified Service Bus namespace URL
        - credential: Authentication credential
        - retry_total: Maximum number of retry attempts
        - retry_backoff_factor: Backoff factor for retry delays
        - retry_backoff_max: Maximum backoff time in seconds
        - retry_mode: Retry strategy ("exponential" or "fixed")
        """

Usage Example

from azure.servicebus import ServiceBusClient
from azure.identity import DefaultAzureCredential

# Using Azure Active Directory credential
credential = DefaultAzureCredential()
client = ServiceBusClient(
    fully_qualified_namespace="myservicebus.servicebus.windows.net",
    credential=credential
)

# Using connection string (easier for development)
client = ServiceBusClient.from_connection_string(
    "Endpoint=sb://myservicebus.servicebus.windows.net/;SharedAccessKeyName=..."
)

Connection String Factory

Creates a ServiceBusClient instance from a connection string with automatic credential parsing.

@classmethod
def from_connection_string(
    cls,
    conn_str: str,
    **kwargs
) -> ServiceBusClient:
    """
    Create a Service Bus client from a connection string.

    Parameters:
    - conn_str: Service Bus connection string
    - **kwargs: Additional configuration options

    Returns:
    ServiceBusClient instance
    """

Client Properties

Access to client configuration and connection information.

@property
def fully_qualified_namespace(self) -> str:
    """The fully qualified namespace URL of the Service Bus."""

Sender Factory Methods

Create message senders for queues and topics with configuration options.

def get_queue_sender(
    self,
    queue_name: str,
    *,
    client_identifier: Optional[str] = None,
    socket_timeout: Optional[float] = None,
    **kwargs
) -> ServiceBusSender:
    """
    Create a sender for a specific queue.

    Parameters:
    - queue_name: Name of the target queue
    - client_identifier: Optional identifier for the client connection
    - socket_timeout: Socket timeout in seconds

    Returns:
    ServiceBusSender instance for the specified queue
    """

def get_topic_sender(
    self,
    topic_name: str,
    *,
    client_identifier: Optional[str] = None,
    socket_timeout: Optional[float] = None,
    **kwargs
) -> ServiceBusSender:
    """
    Create a sender for a specific topic.

    Parameters:
    - topic_name: Name of the target topic
    - client_identifier: Optional identifier for the client connection
    - socket_timeout: Socket timeout in seconds

    Returns:
    ServiceBusSender instance for the specified topic
    """

Receiver Factory Methods

Create message receivers for queues and subscriptions with extensive configuration options.

def get_queue_receiver(
    self,
    queue_name: str,
    *,
    client_identifier: Optional[str] = None,
    socket_timeout: Optional[float] = None,
    session_id: Optional[Union[str, NextAvailableSessionType]] = None,
    sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None,
    receive_mode: Union[ServiceBusReceiveMode, str] = ServiceBusReceiveMode.PEEK_LOCK,
    max_wait_time: Optional[float] = None,
    auto_lock_renewer: Optional[AutoLockRenewer] = None,
    prefetch_count: int = 0,
    **kwargs
) -> ServiceBusReceiver:
    """
    Create a receiver for a specific queue.

    Parameters:
    - queue_name: Name of the source queue
    - client_identifier: Optional identifier for the client connection
    - socket_timeout: Socket timeout in seconds
    - session_id: Session ID for session-enabled queues, or NEXT_AVAILABLE_SESSION
    - sub_queue: Sub-queue type (DEAD_LETTER, TRANSFER_DEAD_LETTER)
    - receive_mode: Message receive mode (PEEK_LOCK or RECEIVE_AND_DELETE)
    - max_wait_time: Maximum time to wait for messages
    - auto_lock_renewer: AutoLockRenewer instance for automatic lock renewal
    - prefetch_count: Number of messages to prefetch (0 disables prefetching)

    Returns:
    ServiceBusReceiver instance for the specified queue
    """

def get_subscription_receiver(
    self,
    topic_name: str,
    subscription_name: str,
    *,
    client_identifier: Optional[str] = None,
    socket_timeout: Optional[float] = None,
    session_id: Optional[Union[str, NextAvailableSessionType]] = None,
    sub_queue: Optional[Union[ServiceBusSubQueue, str]] = None,
    receive_mode: Union[ServiceBusReceiveMode, str] = ServiceBusReceiveMode.PEEK_LOCK,
    max_wait_time: Optional[float] = None,
    auto_lock_renewer: Optional[AutoLockRenewer] = None,
    prefetch_count: int = 0,
    **kwargs
) -> ServiceBusReceiver:
    """
    Create a receiver for a specific subscription.

    Parameters:
    - topic_name: Name of the source topic
    - subscription_name: Name of the subscription
    - client_identifier: Optional identifier for the client connection
    - socket_timeout: Socket timeout in seconds
    - session_id: Session ID for session-enabled subscriptions, or NEXT_AVAILABLE_SESSION
    - sub_queue: Sub-queue type (DEAD_LETTER, TRANSFER_DEAD_LETTER)
    - receive_mode: Message receive mode (PEEK_LOCK or RECEIVE_AND_DELETE)
    - max_wait_time: Maximum time to wait for messages
    - auto_lock_renewer: AutoLockRenewer instance for automatic lock renewal
    - prefetch_count: Number of messages to prefetch (0 disables prefetching)

    Returns:
    ServiceBusReceiver instance for the specified subscription
    """

Usage Example

from azure.servicebus import ServiceBusClient, ServiceBusReceiveMode, NEXT_AVAILABLE_SESSION

client = ServiceBusClient.from_connection_string("your_connection_string")

# Create a basic queue receiver
receiver = client.get_queue_receiver("my-queue")

# Create a session-enabled queue receiver for next available session
session_receiver = client.get_queue_receiver(
    "my-session-queue",
    session_id=NEXT_AVAILABLE_SESSION
)

# Create a subscription receiver with custom configuration
subscription_receiver = client.get_subscription_receiver(
    topic_name="my-topic",
    subscription_name="my-subscription",
    receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE,
    prefetch_count=10
)

Resource Management

Properly close client connections and release resources.

def close(self) -> None:
    """
    Close the client and release all associated resources.
    
    This should be called when the client is no longer needed to ensure
    proper cleanup of network connections and other resources.
    """

Context Manager Support

ServiceBusClient supports context manager protocol for automatic resource cleanup.

def __enter__(self) -> ServiceBusClient: ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

Usage Example

# Automatic resource cleanup with context manager
with ServiceBusClient.from_connection_string("your_connection_string") as client:
    with client.get_queue_sender("my-queue") as sender:
        sender.send_messages(ServiceBusMessage("Hello"))
    
    with client.get_queue_receiver("my-queue") as receiver:
        messages = receiver.receive_messages(max_message_count=5)
        for message in messages:
            receiver.complete_message(message)
# Client and all senders/receivers are automatically closed

Connection String Parsing Utilities

Parse and work with Service Bus connection strings directly.

def parse_connection_string(conn_str: str) -> ServiceBusConnectionStringProperties:
    """
    Parse a Service Bus connection string into its components.

    Parameters:
    - conn_str: Service Bus connection string

    Returns:
    ServiceBusConnectionStringProperties with parsed components
    """

class ServiceBusConnectionStringProperties:
    """Parsed components of a Service Bus connection string."""
    @property
    def endpoint(self) -> str: ...
    @property
    def entity_path(self) -> Optional[str]: ...
    @property
    def shared_access_key_name(self) -> Optional[str]: ...
    @property
    def shared_access_key(self) -> Optional[str]: ...
    @property
    def shared_access_signature(self) -> Optional[str]: ...

Asynchronous Client Management

For asynchronous operations, use the async version of ServiceBusClient from the azure.servicebus.aio module.

from azure.servicebus.aio import ServiceBusClient

class ServiceBusClient:
    # Same methods as sync version but with async/await
    async def __aenter__(self) -> ServiceBusClient: ...
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
    async def close(self) -> None: ...

Usage Example

import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage

async def main():
    async with ServiceBusClient.from_connection_string("your_connection_string") as client:
        async with client.get_queue_sender("my-queue") as sender:
            await sender.send_messages(ServiceBusMessage("Hello Async"))
        
        async with client.get_queue_receiver("my-queue") as receiver:
            messages = await receiver.receive_messages(max_message_count=5)
            for message in messages:
                await receiver.complete_message(message)

asyncio.run(main())

Install with Tessl CLI

npx tessl i tessl/pypi-azure-servicebus

docs

administrative-operations.md

client-management.md

constants-enums.md

exception-handling.md

index.md

message-operations.md

message-types.md

session-management.md

tile.json