Microsoft Azure Service Bus Client Library for Python providing comprehensive messaging capabilities for enterprise applications.
Overall
score
92%
Complete Service Bus entity management including queues, topics, subscriptions, and rules. Supports creation, configuration, monitoring, and deletion of messaging entities.
Create an administration client for managing Service Bus entities.
class ServiceBusAdministrationClient:
def __init__(
self,
fully_qualified_namespace: str,
credential: TokenCredential,
*,
api_version: Union[str, ApiVersion] = DEFAULT_VERSION,
**kwargs
):
"""
Create a Service Bus administration client.
Parameters:
- fully_qualified_namespace: The fully qualified Service Bus namespace URL
- credential: Azure credential for authentication (Azure Active Directory)
- api_version: Service Bus management API version to use
"""
@classmethod
def from_connection_string(
cls,
conn_str: str,
**kwargs
) -> 'ServiceBusAdministrationClient':
"""
Create administration client from connection string.
Parameters:
- conn_str: Service Bus connection string with management permissions
Returns:
ServiceBusAdministrationClient instance
"""from azure.servicebus.management import ServiceBusAdministrationClient
from azure.identity import DefaultAzureCredential
# Using Azure Active Directory credential
credential = DefaultAzureCredential()
admin_client = ServiceBusAdministrationClient(
fully_qualified_namespace="myservicebus.servicebus.windows.net",
credential=credential
)
# Or using connection string
admin_client = ServiceBusAdministrationClient.from_connection_string(
"Endpoint=sb://myservicebus.servicebus.windows.net/;SharedAccessKeyName=..."
)Create, configure, monitor, and delete Service Bus queues.
def create_queue(
self,
queue_name: str,
**kwargs
) -> QueueProperties:
"""
Create a new queue.
Parameters:
- queue_name: Name of the queue to create
- **kwargs: Queue configuration options (max_size_in_megabytes, default_message_time_to_live, etc.)
Returns:
QueueProperties with the created queue configuration
Raises:
- MessagingEntityAlreadyExistsError: If queue already exists
- ServiceBusError: For other Service Bus related errors
"""
def get_queue(
self,
queue_name: str,
**kwargs
) -> QueueProperties:
"""
Get queue properties and configuration.
Parameters:
- queue_name: Name of the queue
Returns:
QueueProperties with queue configuration
Raises:
- MessagingEntityNotFoundError: If queue does not exist
- ServiceBusError: For other Service Bus related errors
"""
def get_queue_runtime_properties(
self,
queue_name: str,
**kwargs
) -> QueueRuntimeProperties:
"""
Get queue runtime information and metrics.
Parameters:
- queue_name: Name of the queue
Returns:
QueueRuntimeProperties with runtime metrics
Raises:
- MessagingEntityNotFoundError: If queue does not exist
- ServiceBusError: For other Service Bus related errors
"""
def update_queue(
self,
queue: Union[QueueProperties, Mapping[str, Any]],
**kwargs
) -> None:
"""
Update queue configuration.
Parameters:
- queue: QueueProperties object or dictionary with queue settings
Raises:
- MessagingEntityNotFoundError: If queue does not exist
- ServiceBusError: For other Service Bus related errors
"""
def delete_queue(
self,
queue_name: str,
**kwargs
) -> None:
"""
Delete a queue.
Parameters:
- queue_name: Name of the queue to delete
Raises:
- MessagingEntityNotFoundError: If queue does not exist
- ServiceBusError: For other Service Bus related errors
"""
def list_queues(self, **kwargs) -> ItemPaged[QueueProperties]:
"""
List all queues in the namespace.
Returns:
ItemPaged iterator of QueueProperties objects
Raises:
- ServiceBusError: For Service Bus related errors
"""
def list_queues_runtime_properties(self, **kwargs) -> ItemPaged[QueueRuntimeProperties]:
"""
List runtime properties for all queues.
Returns:
ItemPaged iterator of QueueRuntimeProperties objects
Raises:
- ServiceBusError: For Service Bus related errors
"""from azure.servicebus.management import ServiceBusAdministrationClient, QueueProperties
from datetime import timedelta
admin_client = ServiceBusAdministrationClient.from_connection_string("your_connection_string")
# Create a queue with custom settings
queue_properties = admin_client.create_queue(
"my-new-queue",
max_size_in_megabytes=2048,
default_message_time_to_live=timedelta(days=7),
enable_dead_lettering_on_message_expiration=True,
max_delivery_count=5
)
print(f"Created queue: {queue_properties.name}")
# Get queue information
queue_info = admin_client.get_queue("my-new-queue")
print(f"Queue max size: {queue_info.max_size_in_megabytes} MB")
# Get runtime metrics
runtime_info = admin_client.get_queue_runtime_properties("my-new-queue")
print(f"Active messages: {runtime_info.active_message_count}")
print(f"Dead letter messages: {runtime_info.dead_letter_message_count}")
# List all queues
for queue in admin_client.list_queues():
print(f"Queue: {queue.name}, Size: {queue.max_size_in_megabytes} MB")
# Update queue settings
queue_info.max_delivery_count = 10
admin_client.update_queue(queue_info)
# Delete queue when no longer needed
admin_client.delete_queue("my-new-queue")Create, configure, monitor, and delete Service Bus topics.
def create_topic(
self,
topic_name: str,
**kwargs
) -> TopicProperties:
"""
Create a new topic.
Parameters:
- topic_name: Name of the topic to create
- **kwargs: Topic configuration options
Returns:
TopicProperties with the created topic configuration
Raises:
- MessagingEntityAlreadyExistsError: If topic already exists
- ServiceBusError: For other Service Bus related errors
"""
def get_topic(
self,
topic_name: str,
**kwargs
) -> TopicProperties:
"""
Get topic properties and configuration.
Parameters:
- topic_name: Name of the topic
Returns:
TopicProperties with topic configuration
Raises:
- MessagingEntityNotFoundError: If topic does not exist
- ServiceBusError: For other Service Bus related errors
"""
def get_topic_runtime_properties(
self,
topic_name: str,
**kwargs
) -> TopicRuntimeProperties:
"""
Get topic runtime information and metrics.
Parameters:
- topic_name: Name of the topic
Returns:
TopicRuntimeProperties with runtime metrics
Raises:
- MessagingEntityNotFoundError: If topic does not exist
- ServiceBusError: For other Service Bus related errors
"""
def update_topic(
self,
topic: Union[TopicProperties, Mapping[str, Any]],
**kwargs
) -> None:
"""
Update topic configuration.
Parameters:
- topic: TopicProperties object or dictionary with topic settings
Raises:
- MessagingEntityNotFoundError: If topic does not exist
- ServiceBusError: For other Service Bus related errors
"""
def delete_topic(
self,
topic_name: str,
**kwargs
) -> None:
"""
Delete a topic and all its subscriptions.
Parameters:
- topic_name: Name of the topic to delete
Raises:
- MessagingEntityNotFoundError: If topic does not exist
- ServiceBusError: For other Service Bus related errors
"""
def list_topics(self, **kwargs) -> ItemPaged[TopicProperties]:
"""
List all topics in the namespace.
Returns:
ItemPaged iterator of TopicProperties objects
Raises:
- ServiceBusError: For Service Bus related errors
"""
def list_topics_runtime_properties(self, **kwargs) -> ItemPaged[TopicRuntimeProperties]:
"""
List runtime properties for all topics.
Returns:
ItemPaged iterator of TopicRuntimeProperties objects
Raises:
- ServiceBusError: For Service Bus related errors
"""Create, configure, monitor, and delete topic subscriptions.
def create_subscription(
self,
topic_name: str,
subscription_name: str,
**kwargs
) -> SubscriptionProperties:
"""
Create a new subscription for a topic.
Parameters:
- topic_name: Name of the parent topic
- subscription_name: Name of the subscription to create
- **kwargs: Subscription configuration options
Returns:
SubscriptionProperties with the created subscription configuration
Raises:
- MessagingEntityNotFoundError: If topic does not exist
- MessagingEntityAlreadyExistsError: If subscription already exists
- ServiceBusError: For other Service Bus related errors
"""
def get_subscription(
self,
topic_name: str,
subscription_name: str,
**kwargs
) -> SubscriptionProperties:
"""
Get subscription properties and configuration.
Parameters:
- topic_name: Name of the parent topic
- subscription_name: Name of the subscription
Returns:
SubscriptionProperties with subscription configuration
Raises:
- MessagingEntityNotFoundError: If topic or subscription does not exist
- ServiceBusError: For other Service Bus related errors
"""
def get_subscription_runtime_properties(
self,
topic_name: str,
subscription_name: str,
**kwargs
) -> SubscriptionRuntimeProperties:
"""
Get subscription runtime information and metrics.
Parameters:
- topic_name: Name of the parent topic
- subscription_name: Name of the subscription
Returns:
SubscriptionRuntimeProperties with runtime metrics
Raises:
- MessagingEntityNotFoundError: If topic or subscription does not exist
- ServiceBusError: For other Service Bus related errors
"""
def update_subscription(
self,
topic_name: str,
subscription: Union[SubscriptionProperties, Mapping[str, Any]],
**kwargs
) -> None:
"""
Update subscription configuration.
Parameters:
- topic_name: Name of the parent topic
- subscription: SubscriptionProperties object or dictionary with subscription settings
Raises:
- MessagingEntityNotFoundError: If topic or subscription does not exist
- ServiceBusError: For other Service Bus related errors
"""
def delete_subscription(
self,
topic_name: str,
subscription_name: str,
**kwargs
) -> None:
"""
Delete a subscription.
Parameters:
- topic_name: Name of the parent topic
- subscription_name: Name of the subscription to delete
Raises:
- MessagingEntityNotFoundError: If topic or subscription does not exist
- ServiceBusError: For other Service Bus related errors
"""
def list_subscriptions(
self,
topic_name: str,
**kwargs
) -> ItemPaged[SubscriptionProperties]:
"""
List all subscriptions for a topic.
Parameters:
- topic_name: Name of the parent topic
Returns:
ItemPaged iterator of SubscriptionProperties objects
Raises:
- MessagingEntityNotFoundError: If topic does not exist
- ServiceBusError: For Service Bus related errors
"""
def list_subscriptions_runtime_properties(
self,
topic_name: str,
**kwargs
) -> ItemPaged[SubscriptionRuntimeProperties]:
"""
List runtime properties for all subscriptions of a topic.
Parameters:
- topic_name: Name of the parent topic
Returns:
ItemPaged iterator of SubscriptionRuntimeProperties objects
Raises:
- MessagingEntityNotFoundError: If topic does not exist
- ServiceBusError: For Service Bus related errors
"""from azure.servicebus.management import ServiceBusAdministrationClient
from datetime import timedelta
admin_client = ServiceBusAdministrationClient.from_connection_string("your_connection_string")
# Create a topic
topic_properties = admin_client.create_topic(
"my-topic",
max_size_in_megabytes=5120,
enable_partitioning=True
)
print(f"Created topic: {topic_properties.name}")
# Create subscriptions for the topic
subscription1 = admin_client.create_subscription(
"my-topic",
"subscription1",
default_message_time_to_live=timedelta(hours=24),
enable_dead_lettering_on_message_expiration=True
)
subscription2 = admin_client.create_subscription(
"my-topic",
"subscription2",
max_delivery_count=3
)
# List all subscriptions
for sub in admin_client.list_subscriptions("my-topic"):
print(f"Subscription: {sub.subscription_name}")
# Get runtime metrics for each subscription
runtime = admin_client.get_subscription_runtime_properties("my-topic", sub.subscription_name)
print(f" Active messages: {runtime.active_message_count}")
print(f" Dead letter messages: {runtime.dead_letter_message_count}")Create, configure, and delete subscription rules for message filtering.
def create_rule(
self,
topic_name: str,
subscription_name: str,
rule_name: str,
**kwargs
) -> RuleProperties:
"""
Create a new rule for a subscription.
Parameters:
- topic_name: Name of the parent topic
- subscription_name: Name of the subscription
- rule_name: Name of the rule to create
- **kwargs: Rule configuration (filter, action)
Returns:
RuleProperties with the created rule configuration
Raises:
- MessagingEntityNotFoundError: If topic or subscription does not exist
- MessagingEntityAlreadyExistsError: If rule already exists
- ServiceBusError: For other Service Bus related errors
"""
def get_rule(
self,
topic_name: str,
subscription_name: str,
rule_name: str,
**kwargs
) -> RuleProperties:
"""
Get rule properties and configuration.
Parameters:
- topic_name: Name of the parent topic
- subscription_name: Name of the subscription
- rule_name: Name of the rule
Returns:
RuleProperties with rule configuration
Raises:
- MessagingEntityNotFoundError: If topic, subscription, or rule does not exist
- ServiceBusError: For other Service Bus related errors
"""
def update_rule(
self,
topic_name: str,
subscription_name: str,
rule: Union[RuleProperties, Mapping[str, Any]],
**kwargs
) -> None:
"""
Update rule configuration.
Parameters:
- topic_name: Name of the parent topic
- subscription_name: Name of the subscription
- rule: RuleProperties object or dictionary with rule settings
Raises:
- MessagingEntityNotFoundError: If topic, subscription, or rule does not exist
- ServiceBusError: For other Service Bus related errors
"""
def delete_rule(
self,
topic_name: str,
subscription_name: str,
rule_name: str,
**kwargs
) -> None:
"""
Delete a rule.
Parameters:
- topic_name: Name of the parent topic
- subscription_name: Name of the subscription
- rule_name: Name of the rule to delete
Raises:
- MessagingEntityNotFoundError: If topic, subscription, or rule does not exist
- ServiceBusError: For other Service Bus related errors
"""
def list_rules(
self,
topic_name: str,
subscription_name: str,
**kwargs
) -> ItemPaged[RuleProperties]:
"""
List all rules for a subscription.
Parameters:
- topic_name: Name of the parent topic
- subscription_name: Name of the subscription
Returns:
ItemPaged iterator of RuleProperties objects
Raises:
- MessagingEntityNotFoundError: If topic or subscription does not exist
- ServiceBusError: For Service Bus related errors
"""from azure.servicebus.management import (
ServiceBusAdministrationClient,
SqlRuleFilter,
CorrelationRuleFilter,
SqlRuleAction
)
admin_client = ServiceBusAdministrationClient.from_connection_string("your_connection_string")
# Create rules with different filter types
sql_rule = admin_client.create_rule(
"my-topic",
"subscription1",
"high-priority-rule",
filter=SqlRuleFilter("priority = 'high'"),
action=SqlRuleAction("SET priority = 'processed'")
)
correlation_rule = admin_client.create_rule(
"my-topic",
"subscription2",
"order-rule",
filter=CorrelationRuleFilter(
correlation_id="order-12345",
properties={"type": "order", "region": "west"}
)
)
# List and examine rules
for rule in admin_client.list_rules("my-topic", "subscription1"):
print(f"Rule: {rule.name}")
print(f"Filter: {rule.filter}")
if rule.action:
print(f"Action: {rule.action}")
# Delete a rule
admin_client.delete_rule("my-topic", "subscription1", "high-priority-rule")Get information about the Service Bus namespace.
def get_namespace_properties(self, **kwargs) -> NamespaceProperties:
"""
Get namespace properties and information.
Returns:
NamespaceProperties with namespace information
Raises:
- ServiceBusError: For Service Bus related errors
"""admin_client = ServiceBusAdministrationClient.from_connection_string("your_connection_string")
# Get namespace information
namespace_info = admin_client.get_namespace_properties()
print(f"Namespace: {namespace_info.name}")
print(f"SKU: {namespace_info.messaging_sku}")
print(f"Created: {namespace_info.created_at_utc}")
print(f"Modified: {namespace_info.modified_at_utc}")Configuration settings for Service Bus queues.
class QueueProperties:
@property
def name(self) -> str: ...
@property
def max_size_in_megabytes(self) -> int: ...
@property
def default_message_time_to_live(self) -> timedelta: ...
@property
def lock_duration(self) -> timedelta: ...
@property
def max_delivery_count(self) -> int: ...
@property
def enable_dead_lettering_on_message_expiration(self) -> bool: ...
@property
def enable_sessions(self) -> bool: ...
@property
def enable_duplicate_detection(self) -> bool: ...
@property
def duplicate_detection_history_time_window(self) -> timedelta: ...
@property
def enable_batched_operations(self) -> bool: ...
@property
def status(self) -> EntityStatus: ...
@property
def enable_partitioning(self) -> bool: ...
@property
def forward_to(self) -> Optional[str]: ...
@property
def forward_dead_lettered_messages_to(self) -> Optional[str]: ...
@property
def auto_delete_on_idle(self) -> timedelta: ...
@property
def authorization_rules(self) -> List[AuthorizationRule]: ...Runtime metrics and information for Service Bus queues.
class QueueRuntimeProperties:
@property
def name(self) -> str: ...
@property
def active_message_count(self) -> int: ...
@property
def dead_letter_message_count(self) -> int: ...
@property
def scheduled_message_count(self) -> int: ...
@property
def transfer_dead_letter_message_count(self) -> int: ...
@property
def transfer_message_count(self) -> int: ...
@property
def size_in_bytes(self) -> int: ...
@property
def created_at_utc(self) -> datetime: ...
@property
def updated_at_utc(self) -> datetime: ...
@property
def accessed_at_utc(self) -> datetime: ...For asynchronous operations, use the async version from the azure.servicebus.aio.management module.
from azure.servicebus.aio.management import ServiceBusAdministrationClient
# All methods have async equivalents
class ServiceBusAdministrationClient:
async def create_queue(self, queue_name: str, **kwargs) -> QueueProperties: ...
async def get_queue(self, queue_name: str, **kwargs) -> QueueProperties: ...
async def delete_queue(self, queue_name: str, **kwargs) -> None: ...
# ... (all other methods similarly async)import asyncio
from azure.servicebus.aio.management import ServiceBusAdministrationClient
async def manage_entities():
admin_client = ServiceBusAdministrationClient.from_connection_string("your_connection_string")
# Create queue asynchronously
queue = await admin_client.create_queue("async-queue")
print(f"Created queue: {queue.name}")
# List queues asynchronously
async for queue in admin_client.list_queues():
print(f"Queue: {queue.name}")
# Get runtime properties
runtime = await admin_client.get_queue_runtime_properties(queue.name)
print(f" Active messages: {runtime.active_message_count}")
asyncio.run(manage_entities())Install with Tessl CLI
npx tessl i tessl/pypi-azure-servicebusdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10