CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-mgmt-servicebus

Microsoft Azure Service Bus Management Client Library for programmatic control of Service Bus namespaces, queues, topics, subscriptions, and authorization rules

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

queue-operations.mddocs/

Queue Operations

Queue lifecycle management, configuration, and authorization rule management for point-to-point messaging scenarios. Service Bus queues provide reliable message delivery with features like dead lettering, message sessions, duplicate detection, and automatic forwarding.

Capabilities

Queue Lifecycle Management

Create, retrieve, update, delete, and list Service Bus queues within a namespace.

def list_by_namespace(
    self,
    resource_group_name: str,
    namespace_name: str,
    skip: Optional[int] = None,
    top: Optional[int] = None
) -> ItemPaged[SBQueue]:
    """List queues in a namespace.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        skip (int, optional): Number of queues to skip.
        top (int, optional): Maximum number of queues to return.
    
    Returns:
        ItemPaged[SBQueue]: Iterable of queue resources.
    """

def create_or_update(
    self,
    resource_group_name: str,
    namespace_name: str,
    queue_name: str,
    parameters: SBQueue
) -> SBQueue:
    """Create or update a Service Bus queue.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        queue_name (str): Name of the queue.
        parameters (SBQueue): Queue configuration parameters.
    
    Returns:
        SBQueue: The created or updated queue.
    """

def get(
    self,
    resource_group_name: str,
    namespace_name: str,
    queue_name: str
) -> SBQueue:
    """Get details of a specific queue.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        queue_name (str): Name of the queue.
    
    Returns:
        SBQueue: The queue resource.
    """

def delete(
    self,
    resource_group_name: str,
    namespace_name: str,
    queue_name: str
) -> None:
    """Delete a queue.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        queue_name (str): Name of the queue.
    """

Queue Authorization Rules

Manage authorization rules for queue-level access control with specific rights (Send, Listen, Manage).

def list_authorization_rules(
    self,
    resource_group_name: str,
    namespace_name: str,
    queue_name: str
) -> ItemPaged[SBAuthorizationRule]:
    """List authorization rules for a queue.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        queue_name (str): Name of the queue.
    
    Returns:
        ItemPaged[SBAuthorizationRule]: Iterable of authorization rules.
    """

def create_or_update_authorization_rule(
    self,
    resource_group_name: str,
    namespace_name: str,
    queue_name: str,
    authorization_rule_name: str,
    parameters: SBAuthorizationRule
) -> SBAuthorizationRule:
    """Create or update an authorization rule for a queue.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        queue_name (str): Name of the queue.
        authorization_rule_name (str): Name of the authorization rule.
        parameters (SBAuthorizationRule): Authorization rule parameters.
    
    Returns:
        SBAuthorizationRule: The created or updated authorization rule.
    """

def get_authorization_rule(
    self,
    resource_group_name: str,
    namespace_name: str,
    queue_name: str,
    authorization_rule_name: str
) -> SBAuthorizationRule:
    """Get an authorization rule for a queue.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        queue_name (str): Name of the queue.
        authorization_rule_name (str): Name of the authorization rule.
    
    Returns:
        SBAuthorizationRule: The authorization rule.
    """

def delete_authorization_rule(
    self,
    resource_group_name: str,
    namespace_name: str,
    queue_name: str,
    authorization_rule_name: str
) -> None:
    """Delete an authorization rule for a queue.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        queue_name (str): Name of the queue.
        authorization_rule_name (str): Name of the authorization rule.
    """

Queue Access Key Management

Generate and manage access keys for queue-specific authentication.

def list_keys(
    self,
    resource_group_name: str,
    namespace_name: str,
    queue_name: str,
    authorization_rule_name: str
) -> AccessKeys:
    """Get access keys for a queue authorization rule.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        queue_name (str): Name of the queue.
        authorization_rule_name (str): Name of the authorization rule.
    
    Returns:
        AccessKeys: Primary and secondary keys with connection strings.
    """

def regenerate_keys(
    self,
    resource_group_name: str,
    namespace_name: str,
    queue_name: str,
    authorization_rule_name: str,
    parameters: RegenerateAccessKeyParameters
) -> AccessKeys:
    """Regenerate access keys for a queue authorization rule.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        queue_name (str): Name of the queue.
        authorization_rule_name (str): Name of the authorization rule.
        parameters (RegenerateAccessKeyParameters): Regeneration parameters.
    
    Returns:
        AccessKeys: New keys with connection strings.
    """

Usage Examples

Creating a Basic Queue

from azure.mgmt.servicebus import ServiceBusManagementClient
from azure.mgmt.servicebus.models import SBQueue
from azure.identity import DefaultAzureCredential
from datetime import timedelta

client = ServiceBusManagementClient(DefaultAzureCredential(), subscription_id)

# Create a basic queue with default settings
queue_params = SBQueue(
    max_size_in_megabytes=1024,  # 1GB
    default_message_time_to_live=timedelta(days=14)  # 14 days TTL
)

queue = client.queues.create_or_update(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    queue_name="orders-queue",
    parameters=queue_params
)

print(f"Queue created: {queue.name}")
print(f"Max size: {queue.max_size_in_megabytes} MB")

Creating a Queue with Advanced Features

from azure.mgmt.servicebus.models import EntityStatus

# Create a queue with advanced messaging features
advanced_queue_params = SBQueue(
    max_size_in_megabytes=5120,  # 5GB
    lock_duration=timedelta(minutes=5),  # Lock duration for message processing
    max_delivery_count=10,  # Max delivery attempts before dead lettering
    requires_duplicate_detection=True,  # Enable duplicate detection
    duplicate_detection_history_time_window=timedelta(minutes=10),
    dead_lettering_on_message_expiration=True,  # Dead letter expired messages
    enable_batched_operations=True,  # Enable server-side batching
    requires_session=True,  # Enable message sessions for ordering
    default_message_time_to_live=timedelta(days=7),
    auto_delete_on_idle=timedelta(days=30),  # Auto-delete if idle for 30 days
    enable_partitioning=True,  # Enable partitioning for higher throughput
    status=EntityStatus.ACTIVE
)

advanced_queue = client.queues.create_or_update(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    queue_name="high-throughput-queue",
    parameters=advanced_queue_params
)

Creating a Queue with Message Forwarding

# Create a queue that forwards messages to another queue
forwarding_queue_params = SBQueue(
    max_size_in_megabytes=2048,
    forward_to="processed-orders-queue",  # Forward all messages here
    forward_dead_lettered_messages_to="dead-letter-queue",  # Forward dead letters
    max_delivery_count=3,
    dead_lettering_on_message_expiration=True
)

forwarding_queue = client.queues.create_or_update(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    queue_name="input-queue",
    parameters=forwarding_queue_params
)

Managing Queue Authorization Rules

from azure.mgmt.servicebus.models import SBAuthorizationRule, AccessRights

# Create a listen-only authorization rule for consumers
listen_rule = SBAuthorizationRule(
    rights=[AccessRights.LISTEN]
)

consumer_auth = client.queues.create_or_update_authorization_rule(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    queue_name="orders-queue",
    authorization_rule_name="ConsumerPolicy",
    parameters=listen_rule
)

# Create a send-only authorization rule for producers
send_rule = SBAuthorizationRule(
    rights=[AccessRights.SEND]
)

producer_auth = client.queues.create_or_update_authorization_rule(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",  
    queue_name="orders-queue",
    authorization_rule_name="ProducerPolicy",
    parameters=send_rule
)

# Get connection strings for each role
consumer_keys = client.queues.list_keys(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    queue_name="orders-queue",
    authorization_rule_name="ConsumerPolicy"
)

producer_keys = client.queues.list_keys(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    queue_name="orders-queue",
    authorization_rule_name="ProducerPolicy"
)

print(f"Consumer connection: {consumer_keys.primary_connection_string}")
print(f"Producer connection: {producer_keys.primary_connection_string}")

Queue Monitoring and Statistics

# Get queue details with message statistics
queue_info = client.queues.get(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    queue_name="orders-queue"
)

print(f"Queue: {queue_info.name}")
print(f"Status: {queue_info.status}")
print(f"Message count: {queue_info.message_count}")
print(f"Size in bytes: {queue_info.size_in_bytes}")
print(f"Created: {queue_info.created_at}")
print(f"Updated: {queue_info.updated_at}")

if queue_info.count_details:
    details = queue_info.count_details
    print(f"Active messages: {details.active_message_count}")
    print(f"Dead letter messages: {details.dead_letter_message_count}")
    print(f"Scheduled messages: {details.scheduled_message_count}")

Types

class SBQueue:
    def __init__(self, **kwargs): ...
    
    # Queue statistics (read-only)
    count_details: Optional[MessageCountDetails]
    created_at: Optional[datetime]
    updated_at: Optional[datetime]
    accessed_at: Optional[datetime]
    size_in_bytes: Optional[int]
    message_count: Optional[int]
    
    # Queue configuration
    lock_duration: Optional[timedelta]  # Default: 1 minute, Max: 5 minutes
    max_size_in_megabytes: Optional[int]  # Default: 1024 MB
    max_message_size_in_kilobytes: Optional[int]  # Premium only, default: 1024 KB
    requires_duplicate_detection: Optional[bool]  # Default: False
    requires_session: Optional[bool]  # Default: False
    default_message_time_to_live: Optional[timedelta]  # Default: TimeSpan.Max
    dead_lettering_on_message_expiration: Optional[bool]  # Default: False
    duplicate_detection_history_time_window: Optional[timedelta]  # Default: 10 minutes
    max_delivery_count: Optional[int]  # Default: 10
    status: Optional[Union[str, EntityStatus]]  # Default: Active
    enable_batched_operations: Optional[bool]  # Default: True
    auto_delete_on_idle: Optional[timedelta]  # Minimum: 5 minutes
    enable_partitioning: Optional[bool]  # Default: False
    enable_express: Optional[bool]  # Default: False
    forward_to: Optional[str]  # Queue/topic name for message forwarding
    forward_dead_lettered_messages_to: Optional[str]  # Dead letter forwarding destination

class MessageCountDetails:
    def __init__(self, **kwargs): ...
    
    active_message_count: Optional[int]
    dead_letter_message_count: Optional[int]
    scheduled_message_count: Optional[int]
    transfer_message_count: Optional[int]
    transfer_dead_letter_message_count: Optional[int]

class SBAuthorizationRule:
    def __init__(self, **kwargs): ...
    
    rights: List[Union[str, AccessRights]]

class AccessKeys:
    def __init__(self, **kwargs): ...
    
    primary_connection_string: Optional[str]
    secondary_connection_string: Optional[str]
    alias_primary_connection_string: Optional[str]  # For GEO DR scenarios
    alias_secondary_connection_string: Optional[str]  # For GEO DR scenarios
    primary_key: Optional[str]
    secondary_key: Optional[str]
    key_name: Optional[str]

Install with Tessl CLI

npx tessl i tessl/pypi-azure-mgmt-servicebus

docs

authorization-security.md

disaster-recovery.md

index.md

message-filtering-rules.md

migration-monitoring.md

namespace-management.md

queue-operations.md

topic-subscription-management.md

tile.json