Microsoft Azure Service Bus Management Client Library for programmatic control of Service Bus namespaces, queues, topics, subscriptions, and authorization rules
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Topic creation and management, subscription handling, and message filtering rules for publish-subscribe messaging patterns. Topics enable one-to-many communication where messages published to a topic are delivered to multiple subscribing applications through subscriptions.
Create, retrieve, update, delete, and list Service Bus topics within a namespace.
def list_by_namespace(
self,
resource_group_name: str,
namespace_name: str,
skip: Optional[int] = None,
top: Optional[int] = None
) -> ItemPaged[SBTopic]:
"""List topics in a namespace.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
skip (int, optional): Number of topics to skip.
top (int, optional): Maximum number of topics to return.
Returns:
ItemPaged[SBTopic]: Iterable of topic resources.
"""
def create_or_update(
self,
resource_group_name: str,
namespace_name: str,
topic_name: str,
parameters: SBTopic
) -> SBTopic:
"""Create or update a Service Bus topic.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
topic_name (str): Name of the topic.
parameters (SBTopic): Topic configuration parameters.
Returns:
SBTopic: The created or updated topic.
"""
def get(
self,
resource_group_name: str,
namespace_name: str,
topic_name: str
) -> SBTopic:
"""Get details of a specific topic.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
topic_name (str): Name of the topic.
Returns:
SBTopic: The topic resource.
"""
def delete(
self,
resource_group_name: str,
namespace_name: str,
topic_name: str
) -> None:
"""Delete a topic.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
topic_name (str): Name of the topic.
"""Manage authorization rules for topic-level access control with specific rights (Send, Listen, Manage).
def list_authorization_rules(
self,
resource_group_name: str,
namespace_name: str,
topic_name: str
) -> ItemPaged[SBAuthorizationRule]:
"""List authorization rules for a topic.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
topic_name (str): Name of the topic.
Returns:
ItemPaged[SBAuthorizationRule]: Iterable of authorization rules.
"""
def create_or_update_authorization_rule(
self,
resource_group_name: str,
namespace_name: str,
topic_name: str,
authorization_rule_name: str,
parameters: SBAuthorizationRule
) -> SBAuthorizationRule:
"""Create or update an authorization rule for a topic.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
topic_name (str): Name of the topic.
authorization_rule_name (str): Name of the authorization rule.
parameters (SBAuthorizationRule): Authorization rule parameters.
Returns:
SBAuthorizationRule: The created or updated authorization rule.
"""
def get_authorization_rule(
self,
resource_group_name: str,
namespace_name: str,
topic_name: str,
authorization_rule_name: str
) -> SBAuthorizationRule:
"""Get an authorization rule for a topic.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
topic_name (str): Name of the topic.
authorization_rule_name (str): Name of the authorization rule.
Returns:
SBAuthorizationRule: The authorization rule.
"""
def delete_authorization_rule(
self,
resource_group_name: str,
namespace_name: str,
topic_name: str,
authorization_rule_name: str
) -> None:
"""Delete an authorization rule for a topic.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
topic_name (str): Name of the topic.
authorization_rule_name (str): Name of the authorization rule.
"""Generate and manage access keys for topic-specific authentication.
def list_keys(
self,
resource_group_name: str,
namespace_name: str,
topic_name: str,
authorization_rule_name: str
) -> AccessKeys:
"""Get access keys for a topic authorization rule.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
topic_name (str): Name of the topic.
authorization_rule_name (str): Name of the authorization rule.
Returns:
AccessKeys: Primary and secondary keys with connection strings.
"""
def regenerate_keys(
self,
resource_group_name: str,
namespace_name: str,
topic_name: str,
authorization_rule_name: str,
parameters: RegenerateAccessKeyParameters
) -> AccessKeys:
"""Regenerate access keys for a topic authorization rule.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
topic_name (str): Name of the topic.
authorization_rule_name (str): Name of the authorization rule.
parameters (RegenerateAccessKeyParameters): Regeneration parameters.
Returns:
AccessKeys: New keys with connection strings.
"""Create, retrieve, update, delete, and list subscriptions for topics to enable message consumption.
def list_by_topic(
self,
resource_group_name: str,
namespace_name: str,
topic_name: str,
skip: Optional[int] = None,
top: Optional[int] = None
) -> ItemPaged[SBSubscription]:
"""List subscriptions for a topic.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
topic_name (str): Name of the topic.
skip (int, optional): Number of subscriptions to skip.
top (int, optional): Maximum number of subscriptions to return.
Returns:
ItemPaged[SBSubscription]: Iterable of subscription resources.
"""
def create_or_update(
self,
resource_group_name: str,
namespace_name: str,
topic_name: str,
subscription_name: str,
parameters: SBSubscription
) -> SBSubscription:
"""Create or update a subscription.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
topic_name (str): Name of the topic.
subscription_name (str): Name of the subscription.
parameters (SBSubscription): Subscription configuration parameters.
Returns:
SBSubscription: The created or updated subscription.
"""
def get(
self,
resource_group_name: str,
namespace_name: str,
topic_name: str,
subscription_name: str
) -> SBSubscription:
"""Get details of a specific subscription.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
topic_name (str): Name of the topic.
subscription_name (str): Name of the subscription.
Returns:
SBSubscription: The subscription resource.
"""
def delete(
self,
resource_group_name: str,
namespace_name: str,
topic_name: str,
subscription_name: str
) -> None:
"""Delete a subscription.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
topic_name (str): Name of the topic.
subscription_name (str): Name of the subscription.
"""from azure.mgmt.servicebus import ServiceBusManagementClient
from azure.mgmt.servicebus.models import SBTopic
from azure.identity import DefaultAzureCredential
from datetime import timedelta
client = ServiceBusManagementClient(DefaultAzureCredential(), subscription_id)
# Create a basic topic
topic_params = SBTopic(
max_size_in_megabytes=2048, # 2GB
default_message_time_to_live=timedelta(days=7) # 7 days TTL
)
topic = client.topics.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
parameters=topic_params
)
print(f"Topic created: {topic.name}")
print(f"Max size: {topic.max_size_in_megabytes} MB")from azure.mgmt.servicebus.models import EntityStatus
# Create a topic with advanced messaging features
advanced_topic_params = SBTopic(
max_size_in_megabytes=5120, # 5GB
requires_duplicate_detection=True, # Enable duplicate detection
duplicate_detection_history_time_window=timedelta(minutes=10),
enable_batched_operations=True, # Enable server-side batching
support_ordering=True, # Enable message ordering
auto_delete_on_idle=timedelta(days=30), # Auto-delete if idle for 30 days
enable_partitioning=True, # Enable partitioning for higher throughput
enable_express=False, # Disable express mode for durability
status=EntityStatus.ACTIVE,
default_message_time_to_live=timedelta(days=14)
)
advanced_topic = client.topics.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="high-volume-events",
parameters=advanced_topic_params
)from azure.mgmt.servicebus.models import SBSubscription
# Create a basic subscription
basic_subscription_params = SBSubscription(
lock_duration=timedelta(minutes=1), # Lock duration for message processing
max_delivery_count=10, # Max delivery attempts before dead lettering
dead_lettering_on_message_expiration=True, # Dead letter expired messages
enable_batched_operations=True,
default_message_time_to_live=timedelta(days=7)
)
basic_subscription = client.subscriptions.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
subscription_name="order-processor",
parameters=basic_subscription_params
)
# Create a session-enabled subscription for ordered processing
session_subscription_params = SBSubscription(
requires_session=True, # Enable message sessions
lock_duration=timedelta(minutes=5),
max_delivery_count=5,
dead_lettering_on_filter_evaluation_exceptions=True, # Dead letter on filter errors
dead_lettering_on_message_expiration=True,
default_message_time_to_live=timedelta(days=3)
)
session_subscription = client.subscriptions.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
subscription_name="sequential-processor",
parameters=session_subscription_params
)
# Create a subscription with message forwarding
forwarding_subscription_params = SBSubscription(
forward_to="processed-orders-queue", # Forward messages to a queue
forward_dead_lettered_messages_to="error-queue", # Forward dead letters
max_delivery_count=3,
dead_lettering_on_message_expiration=True
)
forwarding_subscription = client.subscriptions.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
subscription_name="forwarding-processor",
parameters=forwarding_subscription_params
)from azure.mgmt.servicebus.models import SBAuthorizationRule, AccessRights
# Create a send-only authorization rule for publishers
send_rule = SBAuthorizationRule(
rights=[AccessRights.SEND]
)
publisher_auth = client.topics.create_or_update_authorization_rule(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
authorization_rule_name="PublisherPolicy",
parameters=send_rule
)
# Create a manage rule for administrative access
manage_rule = SBAuthorizationRule(
rights=[AccessRights.MANAGE, AccessRights.SEND, AccessRights.LISTEN]
)
admin_auth = client.topics.create_or_update_authorization_rule(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
authorization_rule_name="AdminPolicy",
parameters=manage_rule
)
# Get connection strings
publisher_keys = client.topics.list_keys(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
authorization_rule_name="PublisherPolicy"
)
print(f"Publisher connection: {publisher_keys.primary_connection_string}")# Get topic statistics
topic_info = client.topics.get(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events"
)
print(f"Topic: {topic_info.name}")
print(f"Status: {topic_info.status}")
print(f"Subscription count: {topic_info.subscription_count}")
print(f"Size in bytes: {topic_info.size_in_bytes}")
print(f"Created: {topic_info.created_at}")
if topic_info.count_details:
details = topic_info.count_details
print(f"Active messages: {details.active_message_count}")
# Get subscription statistics
subscription_info = client.subscriptions.get(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
subscription_name="order-processor"
)
print(f"Subscription: {subscription_info.name}")
print(f"Message count: {subscription_info.message_count}")
print(f"Status: {subscription_info.status}")
if subscription_info.count_details:
details = subscription_info.count_details
print(f"Active messages: {details.active_message_count}")
print(f"Dead letter messages: {details.dead_letter_message_count}")class SBTopic:
def __init__(self, **kwargs): ...
# Topic statistics (read-only)
size_in_bytes: Optional[int]
created_at: Optional[datetime]
updated_at: Optional[datetime]
accessed_at: Optional[datetime]
subscription_count: Optional[int]
count_details: Optional[MessageCountDetails]
# Topic configuration
default_message_time_to_live: Optional[timedelta] # Default: TimeSpan.Max
max_size_in_megabytes: Optional[int] # Default: 1024 MB
max_message_size_in_kilobytes: Optional[int] # Premium only, default: 1024 KB
requires_duplicate_detection: Optional[bool] # Default: False
duplicate_detection_history_time_window: Optional[timedelta] # Default: 10 minutes
enable_batched_operations: Optional[bool] # Default: True
status: Optional[Union[str, EntityStatus]] # Default: Active
support_ordering: Optional[bool] # Default: False
auto_delete_on_idle: Optional[timedelta] # Minimum: 5 minutes
enable_partitioning: Optional[bool] # Default: False
enable_express: Optional[bool] # Default: False
class SBSubscription:
def __init__(self, **kwargs): ...
# Subscription statistics (read-only)
message_count: Optional[int]
created_at: Optional[datetime]
accessed_at: Optional[datetime]
updated_at: Optional[datetime]
count_details: Optional[MessageCountDetails]
# Subscription configuration
lock_duration: Optional[timedelta] # Default: 1 minute, Max: 5 minutes
requires_session: Optional[bool] # Default: False
default_message_time_to_live: Optional[timedelta] # Default: TimeSpan.Max
dead_lettering_on_filter_evaluation_exceptions: Optional[bool] # Default: True
dead_lettering_on_message_expiration: Optional[bool] # Default: False
duplicate_detection_history_time_window: Optional[timedelta] # Default: 10 minutes
max_delivery_count: Optional[int] # Default: 10
status: Optional[Union[str, EntityStatus]] # Default: Active
enable_batched_operations: Optional[bool] # Default: True
auto_delete_on_idle: Optional[timedelta] # Minimum: 5 minutes
forward_to: Optional[str] # Queue/topic name for message forwarding
forward_dead_lettered_messages_to: Optional[str] # Dead letter forwarding destinationInstall with Tessl CLI
npx tessl i tessl/pypi-azure-mgmt-servicebus