CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-servicebus

Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications.

Overall
score

92%

Overview
Eval results
Files

administrative-operations.mddocs/

Administrative Operations

Complete Service Bus entity management including queues, topics, subscriptions, and rules. Supports creation, configuration, monitoring, and deletion of messaging entities.

Capabilities

ServiceBusAdministrationClient Initialization

Create an administration client for managing Service Bus entities.

class ServiceBusAdministrationClient:
    def __init__(
        self,
        fully_qualified_namespace: str,
        credential: TokenCredential,
        *,
        api_version: Union[str, ApiVersion] = DEFAULT_VERSION,
        **kwargs
    ):
        """
        Create a Service Bus administration client.

        Parameters:
        - fully_qualified_namespace: The fully qualified Service Bus namespace URL
        - credential: Azure credential for authentication (Azure Active Directory)
        - api_version: Service Bus management API version to use
        """

    @classmethod
    def from_connection_string(
        cls,
        conn_str: str,
        **kwargs
    ) -> 'ServiceBusAdministrationClient':
        """
        Create administration client from connection string.

        Parameters:
        - conn_str: Service Bus connection string with management permissions

        Returns:
        ServiceBusAdministrationClient instance
        """

Usage Example

from azure.servicebus.management import ServiceBusAdministrationClient
from azure.identity import DefaultAzureCredential

# Using Azure Active Directory credential
credential = DefaultAzureCredential()
admin_client = ServiceBusAdministrationClient(
    fully_qualified_namespace="myservicebus.servicebus.windows.net",
    credential=credential
)

# Or using connection string
admin_client = ServiceBusAdministrationClient.from_connection_string(
    "Endpoint=sb://myservicebus.servicebus.windows.net/;SharedAccessKeyName=..."
)

Queue Management

Create, configure, monitor, and delete Service Bus queues.

def create_queue(
    self,
    queue_name: str,
    **kwargs
) -> QueueProperties:
    """
    Create a new queue.

    Parameters:
    - queue_name: Name of the queue to create
    - **kwargs: Queue configuration options (max_size_in_megabytes, default_message_time_to_live, etc.)

    Returns:
    QueueProperties with the created queue configuration

    Raises:
    - MessagingEntityAlreadyExistsError: If queue already exists
    - ServiceBusError: For other Service Bus related errors
    """

def get_queue(
    self,
    queue_name: str,
    **kwargs
) -> QueueProperties:
    """
    Get queue properties and configuration.

    Parameters:
    - queue_name: Name of the queue

    Returns:
    QueueProperties with queue configuration

    Raises:
    - MessagingEntityNotFoundError: If queue does not exist
    - ServiceBusError: For other Service Bus related errors
    """

def get_queue_runtime_properties(
    self,
    queue_name: str,
    **kwargs
) -> QueueRuntimeProperties:
    """
    Get queue runtime information and metrics.

    Parameters:
    - queue_name: Name of the queue

    Returns:
    QueueRuntimeProperties with runtime metrics

    Raises:
    - MessagingEntityNotFoundError: If queue does not exist
    - ServiceBusError: For other Service Bus related errors
    """

def update_queue(
    self,
    queue: Union[QueueProperties, Mapping[str, Any]],
    **kwargs
) -> None:
    """
    Update queue configuration.

    Parameters:
    - queue: QueueProperties object or dictionary with queue settings

    Raises:
    - MessagingEntityNotFoundError: If queue does not exist
    - ServiceBusError: For other Service Bus related errors
    """

def delete_queue(
    self,
    queue_name: str,
    **kwargs
) -> None:
    """
    Delete a queue.

    Parameters:
    - queue_name: Name of the queue to delete

    Raises:
    - MessagingEntityNotFoundError: If queue does not exist
    - ServiceBusError: For other Service Bus related errors
    """

def list_queues(self, **kwargs) -> ItemPaged[QueueProperties]:
    """
    List all queues in the namespace.

    Returns:
    ItemPaged iterator of QueueProperties objects

    Raises:
    - ServiceBusError: For Service Bus related errors
    """

def list_queues_runtime_properties(self, **kwargs) -> ItemPaged[QueueRuntimeProperties]:
    """
    List runtime properties for all queues.

    Returns:
    ItemPaged iterator of QueueRuntimeProperties objects

    Raises:
    - ServiceBusError: For Service Bus related errors
    """

Usage Example

from azure.servicebus.management import ServiceBusAdministrationClient, QueueProperties
from datetime import timedelta

admin_client = ServiceBusAdministrationClient.from_connection_string("your_connection_string")

# Create a queue with custom settings
queue_properties = admin_client.create_queue(
    "my-new-queue",
    max_size_in_megabytes=2048,
    default_message_time_to_live=timedelta(days=7),
    enable_dead_lettering_on_message_expiration=True,
    max_delivery_count=5
)
print(f"Created queue: {queue_properties.name}")

# Get queue information
queue_info = admin_client.get_queue("my-new-queue")
print(f"Queue max size: {queue_info.max_size_in_megabytes} MB")

# Get runtime metrics
runtime_info = admin_client.get_queue_runtime_properties("my-new-queue")
print(f"Active messages: {runtime_info.active_message_count}")
print(f"Dead letter messages: {runtime_info.dead_letter_message_count}")

# List all queues
for queue in admin_client.list_queues():
    print(f"Queue: {queue.name}, Size: {queue.max_size_in_megabytes} MB")

# Update queue settings
queue_info.max_delivery_count = 10
admin_client.update_queue(queue_info)

# Delete queue when no longer needed
admin_client.delete_queue("my-new-queue")

Topic Management

Create, configure, monitor, and delete Service Bus topics.

def create_topic(
    self,
    topic_name: str,
    **kwargs
) -> TopicProperties:
    """
    Create a new topic.

    Parameters:
    - topic_name: Name of the topic to create
    - **kwargs: Topic configuration options

    Returns:
    TopicProperties with the created topic configuration

    Raises:
    - MessagingEntityAlreadyExistsError: If topic already exists
    - ServiceBusError: For other Service Bus related errors
    """

def get_topic(
    self,
    topic_name: str,
    **kwargs
) -> TopicProperties:
    """
    Get topic properties and configuration.

    Parameters:
    - topic_name: Name of the topic

    Returns:
    TopicProperties with topic configuration

    Raises:
    - MessagingEntityNotFoundError: If topic does not exist
    - ServiceBusError: For other Service Bus related errors
    """

def get_topic_runtime_properties(
    self,
    topic_name: str,
    **kwargs
) -> TopicRuntimeProperties:
    """
    Get topic runtime information and metrics.

    Parameters:
    - topic_name: Name of the topic

    Returns:
    TopicRuntimeProperties with runtime metrics

    Raises:
    - MessagingEntityNotFoundError: If topic does not exist
    - ServiceBusError: For other Service Bus related errors
    """

def update_topic(
    self,
    topic: Union[TopicProperties, Mapping[str, Any]],
    **kwargs
) -> None:
    """
    Update topic configuration.

    Parameters:
    - topic: TopicProperties object or dictionary with topic settings

    Raises:
    - MessagingEntityNotFoundError: If topic does not exist
    - ServiceBusError: For other Service Bus related errors
    """

def delete_topic(
    self,
    topic_name: str,
    **kwargs
) -> None:
    """
    Delete a topic and all its subscriptions.

    Parameters:
    - topic_name: Name of the topic to delete

    Raises:
    - MessagingEntityNotFoundError: If topic does not exist
    - ServiceBusError: For other Service Bus related errors
    """

def list_topics(self, **kwargs) -> ItemPaged[TopicProperties]:
    """
    List all topics in the namespace.

    Returns:
    ItemPaged iterator of TopicProperties objects

    Raises:
    - ServiceBusError: For Service Bus related errors
    """

def list_topics_runtime_properties(self, **kwargs) -> ItemPaged[TopicRuntimeProperties]:
    """
    List runtime properties for all topics.

    Returns:
    ItemPaged iterator of TopicRuntimeProperties objects

    Raises:
    - ServiceBusError: For Service Bus related errors
    """

Subscription Management

Create, configure, monitor, and delete topic subscriptions.

def create_subscription(
    self,
    topic_name: str,
    subscription_name: str,
    **kwargs
) -> SubscriptionProperties:
    """
    Create a new subscription for a topic.

    Parameters:
    - topic_name: Name of the parent topic
    - subscription_name: Name of the subscription to create
    - **kwargs: Subscription configuration options

    Returns:
    SubscriptionProperties with the created subscription configuration

    Raises:
    - MessagingEntityNotFoundError: If topic does not exist
    - MessagingEntityAlreadyExistsError: If subscription already exists
    - ServiceBusError: For other Service Bus related errors
    """

def get_subscription(
    self,
    topic_name: str,
    subscription_name: str,
    **kwargs
) -> SubscriptionProperties:
    """
    Get subscription properties and configuration.

    Parameters:
    - topic_name: Name of the parent topic
    - subscription_name: Name of the subscription

    Returns:
    SubscriptionProperties with subscription configuration

    Raises:
    - MessagingEntityNotFoundError: If topic or subscription does not exist
    - ServiceBusError: For other Service Bus related errors
    """

def get_subscription_runtime_properties(
    self,
    topic_name: str,
    subscription_name: str,
    **kwargs
) -> SubscriptionRuntimeProperties:
    """
    Get subscription runtime information and metrics.

    Parameters:
    - topic_name: Name of the parent topic
    - subscription_name: Name of the subscription

    Returns:
    SubscriptionRuntimeProperties with runtime metrics

    Raises:
    - MessagingEntityNotFoundError: If topic or subscription does not exist
    - ServiceBusError: For other Service Bus related errors
    """

def update_subscription(
    self,
    topic_name: str,
    subscription: Union[SubscriptionProperties, Mapping[str, Any]],
    **kwargs
) -> None:
    """
    Update subscription configuration.

    Parameters:
    - topic_name: Name of the parent topic
    - subscription: SubscriptionProperties object or dictionary with subscription settings

    Raises:
    - MessagingEntityNotFoundError: If topic or subscription does not exist
    - ServiceBusError: For other Service Bus related errors
    """

def delete_subscription(
    self,
    topic_name: str,
    subscription_name: str,
    **kwargs
) -> None:
    """
    Delete a subscription.

    Parameters:
    - topic_name: Name of the parent topic
    - subscription_name: Name of the subscription to delete

    Raises:
    - MessagingEntityNotFoundError: If topic or subscription does not exist
    - ServiceBusError: For other Service Bus related errors
    """

def list_subscriptions(
    self,
    topic_name: str,
    **kwargs
) -> ItemPaged[SubscriptionProperties]:
    """
    List all subscriptions for a topic.

    Parameters:
    - topic_name: Name of the parent topic

    Returns:
    ItemPaged iterator of SubscriptionProperties objects

    Raises:
    - MessagingEntityNotFoundError: If topic does not exist
    - ServiceBusError: For Service Bus related errors
    """

def list_subscriptions_runtime_properties(
    self,
    topic_name: str,
    **kwargs
) -> ItemPaged[SubscriptionRuntimeProperties]:
    """
    List runtime properties for all subscriptions of a topic.

    Parameters:
    - topic_name: Name of the parent topic

    Returns:
    ItemPaged iterator of SubscriptionRuntimeProperties objects

    Raises:
    - MessagingEntityNotFoundError: If topic does not exist
    - ServiceBusError: For Service Bus related errors
    """

Usage Example

from azure.servicebus.management import ServiceBusAdministrationClient
from datetime import timedelta

admin_client = ServiceBusAdministrationClient.from_connection_string("your_connection_string")

# Create a topic
topic_properties = admin_client.create_topic(
    "my-topic",
    max_size_in_megabytes=5120,
    enable_partitioning=True
)
print(f"Created topic: {topic_properties.name}")

# Create subscriptions for the topic
subscription1 = admin_client.create_subscription(
    "my-topic",
    "subscription1",
    default_message_time_to_live=timedelta(hours=24),
    enable_dead_lettering_on_message_expiration=True
)

subscription2 = admin_client.create_subscription(
    "my-topic", 
    "subscription2",
    max_delivery_count=3
)

# List all subscriptions
for sub in admin_client.list_subscriptions("my-topic"):
    print(f"Subscription: {sub.subscription_name}")
    
    # Get runtime metrics for each subscription
    runtime = admin_client.get_subscription_runtime_properties("my-topic", sub.subscription_name)
    print(f"  Active messages: {runtime.active_message_count}")
    print(f"  Dead letter messages: {runtime.dead_letter_message_count}")

Rule Management

Create, configure, and delete subscription rules for message filtering.

def create_rule(
    self,
    topic_name: str,
    subscription_name: str,
    rule_name: str,
    **kwargs
) -> RuleProperties:
    """
    Create a new rule for a subscription.

    Parameters:
    - topic_name: Name of the parent topic
    - subscription_name: Name of the subscription
    - rule_name: Name of the rule to create
    - **kwargs: Rule configuration (filter, action)

    Returns:
    RuleProperties with the created rule configuration

    Raises:
    - MessagingEntityNotFoundError: If topic or subscription does not exist
    - MessagingEntityAlreadyExistsError: If rule already exists
    - ServiceBusError: For other Service Bus related errors
    """

def get_rule(
    self,
    topic_name: str,
    subscription_name: str,
    rule_name: str,
    **kwargs
) -> RuleProperties:
    """
    Get rule properties and configuration.

    Parameters:
    - topic_name: Name of the parent topic
    - subscription_name: Name of the subscription
    - rule_name: Name of the rule

    Returns:
    RuleProperties with rule configuration

    Raises:
    - MessagingEntityNotFoundError: If topic, subscription, or rule does not exist
    - ServiceBusError: For other Service Bus related errors
    """

def update_rule(
    self,
    topic_name: str,
    subscription_name: str,
    rule: Union[RuleProperties, Mapping[str, Any]],
    **kwargs
) -> None:
    """
    Update rule configuration.

    Parameters:
    - topic_name: Name of the parent topic
    - subscription_name: Name of the subscription
    - rule: RuleProperties object or dictionary with rule settings

    Raises:
    - MessagingEntityNotFoundError: If topic, subscription, or rule does not exist
    - ServiceBusError: For other Service Bus related errors
    """

def delete_rule(
    self,
    topic_name: str,
    subscription_name: str,
    rule_name: str,
    **kwargs
) -> None:
    """
    Delete a rule.

    Parameters:
    - topic_name: Name of the parent topic
    - subscription_name: Name of the subscription
    - rule_name: Name of the rule to delete

    Raises:
    - MessagingEntityNotFoundError: If topic, subscription, or rule does not exist
    - ServiceBusError: For other Service Bus related errors
    """

def list_rules(
    self,
    topic_name: str,
    subscription_name: str,
    **kwargs
) -> ItemPaged[RuleProperties]:
    """
    List all rules for a subscription.

    Parameters:
    - topic_name: Name of the parent topic
    - subscription_name: Name of the subscription

    Returns:
    ItemPaged iterator of RuleProperties objects

    Raises:
    - MessagingEntityNotFoundError: If topic or subscription does not exist
    - ServiceBusError: For Service Bus related errors
    """

Usage Example

from azure.servicebus.management import (
    ServiceBusAdministrationClient,
    SqlRuleFilter,
    CorrelationRuleFilter,
    SqlRuleAction
)

admin_client = ServiceBusAdministrationClient.from_connection_string("your_connection_string")

# Create rules with different filter types
sql_rule = admin_client.create_rule(
    "my-topic",
    "subscription1", 
    "high-priority-rule",
    filter=SqlRuleFilter("priority = 'high'"),
    action=SqlRuleAction("SET priority = 'processed'")
)

correlation_rule = admin_client.create_rule(
    "my-topic",
    "subscription2",
    "order-rule", 
    filter=CorrelationRuleFilter(
        correlation_id="order-12345",
        properties={"type": "order", "region": "west"}
    )
)

# List and examine rules
for rule in admin_client.list_rules("my-topic", "subscription1"):
    print(f"Rule: {rule.name}")
    print(f"Filter: {rule.filter}")
    if rule.action:
        print(f"Action: {rule.action}")

# Delete a rule
admin_client.delete_rule("my-topic", "subscription1", "high-priority-rule")

Namespace Operations

Get information about the Service Bus namespace.

def get_namespace_properties(self, **kwargs) -> NamespaceProperties:
    """
    Get namespace properties and information.

    Returns:
    NamespaceProperties with namespace information

    Raises:
    - ServiceBusError: For Service Bus related errors
    """

Usage Example

admin_client = ServiceBusAdministrationClient.from_connection_string("your_connection_string")

# Get namespace information
namespace_info = admin_client.get_namespace_properties()
print(f"Namespace: {namespace_info.name}")
print(f"SKU: {namespace_info.messaging_sku}")
print(f"Created: {namespace_info.created_at_utc}")
print(f"Modified: {namespace_info.modified_at_utc}")

Management Model Properties

QueueProperties

Configuration settings for Service Bus queues.

class QueueProperties:
    @property
    def name(self) -> str: ...
    @property
    def max_size_in_megabytes(self) -> int: ...
    @property
    def default_message_time_to_live(self) -> timedelta: ...
    @property
    def lock_duration(self) -> timedelta: ...
    @property
    def max_delivery_count(self) -> int: ...
    @property
    def enable_dead_lettering_on_message_expiration(self) -> bool: ...
    @property
    def enable_sessions(self) -> bool: ...
    @property
    def enable_duplicate_detection(self) -> bool: ...
    @property
    def duplicate_detection_history_time_window(self) -> timedelta: ...
    @property
    def enable_batched_operations(self) -> bool: ...
    @property
    def status(self) -> EntityStatus: ...
    @property
    def enable_partitioning(self) -> bool: ...
    @property
    def forward_to(self) -> Optional[str]: ...
    @property
    def forward_dead_lettered_messages_to(self) -> Optional[str]: ...
    @property
    def auto_delete_on_idle(self) -> timedelta: ...
    @property
    def authorization_rules(self) -> List[AuthorizationRule]: ...

QueueRuntimeProperties

Runtime metrics and information for Service Bus queues.

class QueueRuntimeProperties:
    @property
    def name(self) -> str: ...
    @property
    def active_message_count(self) -> int: ...
    @property
    def dead_letter_message_count(self) -> int: ...
    @property
    def scheduled_message_count(self) -> int: ...
    @property
    def transfer_dead_letter_message_count(self) -> int: ...
    @property
    def transfer_message_count(self) -> int: ...
    @property
    def size_in_bytes(self) -> int: ...
    @property
    def created_at_utc(self) -> datetime: ...
    @property
    def updated_at_utc(self) -> datetime: ...
    @property
    def accessed_at_utc(self) -> datetime: ...

Asynchronous Administrative Operations

For asynchronous operations, use the async version from the azure.servicebus.aio.management module.

from azure.servicebus.aio.management import ServiceBusAdministrationClient

# All methods have async equivalents
class ServiceBusAdministrationClient:
    async def create_queue(self, queue_name: str, **kwargs) -> QueueProperties: ...
    async def get_queue(self, queue_name: str, **kwargs) -> QueueProperties: ...
    async def delete_queue(self, queue_name: str, **kwargs) -> None: ...
    # ... (all other methods similarly async)

Usage Example

import asyncio
from azure.servicebus.aio.management import ServiceBusAdministrationClient

async def manage_entities():
    admin_client = ServiceBusAdministrationClient.from_connection_string("your_connection_string")
    
    # Create queue asynchronously
    queue = await admin_client.create_queue("async-queue")
    print(f"Created queue: {queue.name}")
    
    # List queues asynchronously
    async for queue in admin_client.list_queues():
        print(f"Queue: {queue.name}")
        
        # Get runtime properties
        runtime = await admin_client.get_queue_runtime_properties(queue.name)
        print(f"  Active messages: {runtime.active_message_count}")

asyncio.run(manage_entities())

Install with Tessl CLI

npx tessl i tessl/pypi-azure-servicebus

docs

administrative-operations.md

client-management.md

constants-enums.md

exception-handling.md

index.md

message-operations.md

message-types.md

session-management.md

tile.json