Microsoft Azure Service Bus Management Client Library for programmatic control of Service Bus namespaces, queues, topics, subscriptions, and authorization rules
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Queue lifecycle management, configuration, and authorization rule management for point-to-point messaging scenarios. Service Bus queues provide reliable message delivery with features like dead lettering, message sessions, duplicate detection, and automatic forwarding.
Create, retrieve, update, delete, and list Service Bus queues within a namespace.
def list_by_namespace(
self,
resource_group_name: str,
namespace_name: str,
skip: Optional[int] = None,
top: Optional[int] = None
) -> ItemPaged[SBQueue]:
"""List queues in a namespace.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
skip (int, optional): Number of queues to skip.
top (int, optional): Maximum number of queues to return.
Returns:
ItemPaged[SBQueue]: Iterable of queue resources.
"""
def create_or_update(
self,
resource_group_name: str,
namespace_name: str,
queue_name: str,
parameters: SBQueue
) -> SBQueue:
"""Create or update a Service Bus queue.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
queue_name (str): Name of the queue.
parameters (SBQueue): Queue configuration parameters.
Returns:
SBQueue: The created or updated queue.
"""
def get(
self,
resource_group_name: str,
namespace_name: str,
queue_name: str
) -> SBQueue:
"""Get details of a specific queue.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
queue_name (str): Name of the queue.
Returns:
SBQueue: The queue resource.
"""
def delete(
self,
resource_group_name: str,
namespace_name: str,
queue_name: str
) -> None:
"""Delete a queue.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
queue_name (str): Name of the queue.
"""Manage authorization rules for queue-level access control with specific rights (Send, Listen, Manage).
def list_authorization_rules(
self,
resource_group_name: str,
namespace_name: str,
queue_name: str
) -> ItemPaged[SBAuthorizationRule]:
"""List authorization rules for a queue.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
queue_name (str): Name of the queue.
Returns:
ItemPaged[SBAuthorizationRule]: Iterable of authorization rules.
"""
def create_or_update_authorization_rule(
self,
resource_group_name: str,
namespace_name: str,
queue_name: str,
authorization_rule_name: str,
parameters: SBAuthorizationRule
) -> SBAuthorizationRule:
"""Create or update an authorization rule for a queue.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
queue_name (str): Name of the queue.
authorization_rule_name (str): Name of the authorization rule.
parameters (SBAuthorizationRule): Authorization rule parameters.
Returns:
SBAuthorizationRule: The created or updated authorization rule.
"""
def get_authorization_rule(
self,
resource_group_name: str,
namespace_name: str,
queue_name: str,
authorization_rule_name: str
) -> SBAuthorizationRule:
"""Get an authorization rule for a queue.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
queue_name (str): Name of the queue.
authorization_rule_name (str): Name of the authorization rule.
Returns:
SBAuthorizationRule: The authorization rule.
"""
def delete_authorization_rule(
self,
resource_group_name: str,
namespace_name: str,
queue_name: str,
authorization_rule_name: str
) -> None:
"""Delete an authorization rule for a queue.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
queue_name (str): Name of the queue.
authorization_rule_name (str): Name of the authorization rule.
"""Generate and manage access keys for queue-specific authentication.
def list_keys(
self,
resource_group_name: str,
namespace_name: str,
queue_name: str,
authorization_rule_name: str
) -> AccessKeys:
"""Get access keys for a queue authorization rule.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
queue_name (str): Name of the queue.
authorization_rule_name (str): Name of the authorization rule.
Returns:
AccessKeys: Primary and secondary keys with connection strings.
"""
def regenerate_keys(
self,
resource_group_name: str,
namespace_name: str,
queue_name: str,
authorization_rule_name: str,
parameters: RegenerateAccessKeyParameters
) -> AccessKeys:
"""Regenerate access keys for a queue authorization rule.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
queue_name (str): Name of the queue.
authorization_rule_name (str): Name of the authorization rule.
parameters (RegenerateAccessKeyParameters): Regeneration parameters.
Returns:
AccessKeys: New keys with connection strings.
"""from azure.mgmt.servicebus import ServiceBusManagementClient
from azure.mgmt.servicebus.models import SBQueue
from azure.identity import DefaultAzureCredential
from datetime import timedelta
client = ServiceBusManagementClient(DefaultAzureCredential(), subscription_id)
# Create a basic queue with default settings
queue_params = SBQueue(
max_size_in_megabytes=1024, # 1GB
default_message_time_to_live=timedelta(days=14) # 14 days TTL
)
queue = client.queues.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
queue_name="orders-queue",
parameters=queue_params
)
print(f"Queue created: {queue.name}")
print(f"Max size: {queue.max_size_in_megabytes} MB")from azure.mgmt.servicebus.models import EntityStatus
# Create a queue with advanced messaging features
advanced_queue_params = SBQueue(
max_size_in_megabytes=5120, # 5GB
lock_duration=timedelta(minutes=5), # Lock duration for message processing
max_delivery_count=10, # Max delivery attempts before dead lettering
requires_duplicate_detection=True, # Enable duplicate detection
duplicate_detection_history_time_window=timedelta(minutes=10),
dead_lettering_on_message_expiration=True, # Dead letter expired messages
enable_batched_operations=True, # Enable server-side batching
requires_session=True, # Enable message sessions for ordering
default_message_time_to_live=timedelta(days=7),
auto_delete_on_idle=timedelta(days=30), # Auto-delete if idle for 30 days
enable_partitioning=True, # Enable partitioning for higher throughput
status=EntityStatus.ACTIVE
)
advanced_queue = client.queues.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
queue_name="high-throughput-queue",
parameters=advanced_queue_params
)# Create a queue that forwards messages to another queue
forwarding_queue_params = SBQueue(
max_size_in_megabytes=2048,
forward_to="processed-orders-queue", # Forward all messages here
forward_dead_lettered_messages_to="dead-letter-queue", # Forward dead letters
max_delivery_count=3,
dead_lettering_on_message_expiration=True
)
forwarding_queue = client.queues.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
queue_name="input-queue",
parameters=forwarding_queue_params
)from azure.mgmt.servicebus.models import SBAuthorizationRule, AccessRights
# Create a listen-only authorization rule for consumers
listen_rule = SBAuthorizationRule(
rights=[AccessRights.LISTEN]
)
consumer_auth = client.queues.create_or_update_authorization_rule(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
queue_name="orders-queue",
authorization_rule_name="ConsumerPolicy",
parameters=listen_rule
)
# Create a send-only authorization rule for producers
send_rule = SBAuthorizationRule(
rights=[AccessRights.SEND]
)
producer_auth = client.queues.create_or_update_authorization_rule(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
queue_name="orders-queue",
authorization_rule_name="ProducerPolicy",
parameters=send_rule
)
# Get connection strings for each role
consumer_keys = client.queues.list_keys(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
queue_name="orders-queue",
authorization_rule_name="ConsumerPolicy"
)
producer_keys = client.queues.list_keys(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
queue_name="orders-queue",
authorization_rule_name="ProducerPolicy"
)
print(f"Consumer connection: {consumer_keys.primary_connection_string}")
print(f"Producer connection: {producer_keys.primary_connection_string}")# Get queue details with message statistics
queue_info = client.queues.get(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
queue_name="orders-queue"
)
print(f"Queue: {queue_info.name}")
print(f"Status: {queue_info.status}")
print(f"Message count: {queue_info.message_count}")
print(f"Size in bytes: {queue_info.size_in_bytes}")
print(f"Created: {queue_info.created_at}")
print(f"Updated: {queue_info.updated_at}")
if queue_info.count_details:
details = queue_info.count_details
print(f"Active messages: {details.active_message_count}")
print(f"Dead letter messages: {details.dead_letter_message_count}")
print(f"Scheduled messages: {details.scheduled_message_count}")class SBQueue:
def __init__(self, **kwargs): ...
# Queue statistics (read-only)
count_details: Optional[MessageCountDetails]
created_at: Optional[datetime]
updated_at: Optional[datetime]
accessed_at: Optional[datetime]
size_in_bytes: Optional[int]
message_count: Optional[int]
# Queue configuration
lock_duration: Optional[timedelta] # Default: 1 minute, Max: 5 minutes
max_size_in_megabytes: Optional[int] # Default: 1024 MB
max_message_size_in_kilobytes: Optional[int] # Premium only, default: 1024 KB
requires_duplicate_detection: Optional[bool] # Default: False
requires_session: Optional[bool] # Default: False
default_message_time_to_live: Optional[timedelta] # Default: TimeSpan.Max
dead_lettering_on_message_expiration: Optional[bool] # Default: False
duplicate_detection_history_time_window: Optional[timedelta] # Default: 10 minutes
max_delivery_count: Optional[int] # Default: 10
status: Optional[Union[str, EntityStatus]] # Default: Active
enable_batched_operations: Optional[bool] # Default: True
auto_delete_on_idle: Optional[timedelta] # Minimum: 5 minutes
enable_partitioning: Optional[bool] # Default: False
enable_express: Optional[bool] # Default: False
forward_to: Optional[str] # Queue/topic name for message forwarding
forward_dead_lettered_messages_to: Optional[str] # Dead letter forwarding destination
class MessageCountDetails:
def __init__(self, **kwargs): ...
active_message_count: Optional[int]
dead_letter_message_count: Optional[int]
scheduled_message_count: Optional[int]
transfer_message_count: Optional[int]
transfer_dead_letter_message_count: Optional[int]
class SBAuthorizationRule:
def __init__(self, **kwargs): ...
rights: List[Union[str, AccessRights]]
class AccessKeys:
def __init__(self, **kwargs): ...
primary_connection_string: Optional[str]
secondary_connection_string: Optional[str]
alias_primary_connection_string: Optional[str] # For GEO DR scenarios
alias_secondary_connection_string: Optional[str] # For GEO DR scenarios
primary_key: Optional[str]
secondary_key: Optional[str]
key_name: Optional[str]Install with Tessl CLI
npx tessl i tessl/pypi-azure-mgmt-servicebus