CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-mgmt-servicebus

Microsoft Azure Service Bus Management Client Library for programmatic control of Service Bus namespaces, queues, topics, subscriptions, and authorization rules

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

topic-subscription-management.mddocs/

Topic and Subscription Management

Topic creation and management, subscription handling, and message filtering rules for publish-subscribe messaging patterns. Topics enable one-to-many communication where messages published to a topic are delivered to multiple subscribing applications through subscriptions.

Capabilities

Topic Lifecycle Management

Create, retrieve, update, delete, and list Service Bus topics within a namespace.

def list_by_namespace(
    self,
    resource_group_name: str,
    namespace_name: str,
    skip: Optional[int] = None,
    top: Optional[int] = None
) -> ItemPaged[SBTopic]:
    """List topics in a namespace.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        skip (int, optional): Number of topics to skip.
        top (int, optional): Maximum number of topics to return.
    
    Returns:
        ItemPaged[SBTopic]: Iterable of topic resources.
    """

def create_or_update(
    self,
    resource_group_name: str,
    namespace_name: str,
    topic_name: str,
    parameters: SBTopic
) -> SBTopic:
    """Create or update a Service Bus topic.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        topic_name (str): Name of the topic.
        parameters (SBTopic): Topic configuration parameters.
    
    Returns:
        SBTopic: The created or updated topic.
    """

def get(
    self,
    resource_group_name: str,
    namespace_name: str,
    topic_name: str
) -> SBTopic:
    """Get details of a specific topic.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        topic_name (str): Name of the topic.
    
    Returns:
        SBTopic: The topic resource.
    """

def delete(
    self,
    resource_group_name: str,
    namespace_name: str,
    topic_name: str
) -> None:
    """Delete a topic.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        topic_name (str): Name of the topic.
    """

Topic Authorization Rules

Manage authorization rules for topic-level access control with specific rights (Send, Listen, Manage).

def list_authorization_rules(
    self,
    resource_group_name: str,
    namespace_name: str,
    topic_name: str
) -> ItemPaged[SBAuthorizationRule]:
    """List authorization rules for a topic.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        topic_name (str): Name of the topic.
    
    Returns:
        ItemPaged[SBAuthorizationRule]: Iterable of authorization rules.
    """

def create_or_update_authorization_rule(
    self,
    resource_group_name: str,
    namespace_name: str,
    topic_name: str,
    authorization_rule_name: str,
    parameters: SBAuthorizationRule
) -> SBAuthorizationRule:
    """Create or update an authorization rule for a topic.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        topic_name (str): Name of the topic.
        authorization_rule_name (str): Name of the authorization rule.
        parameters (SBAuthorizationRule): Authorization rule parameters.
    
    Returns:
        SBAuthorizationRule: The created or updated authorization rule.
    """

def get_authorization_rule(
    self,
    resource_group_name: str,
    namespace_name: str,
    topic_name: str,
    authorization_rule_name: str
) -> SBAuthorizationRule:
    """Get an authorization rule for a topic.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        topic_name (str): Name of the topic.
        authorization_rule_name (str): Name of the authorization rule.
    
    Returns:
        SBAuthorizationRule: The authorization rule.
    """

def delete_authorization_rule(
    self,
    resource_group_name: str,
    namespace_name: str,
    topic_name: str,
    authorization_rule_name: str
) -> None:
    """Delete an authorization rule for a topic.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        topic_name (str): Name of the topic.
        authorization_rule_name (str): Name of the authorization rule.
    """

Topic Access Key Management

Generate and manage access keys for topic-specific authentication.

def list_keys(
    self,
    resource_group_name: str,
    namespace_name: str,
    topic_name: str,
    authorization_rule_name: str
) -> AccessKeys:
    """Get access keys for a topic authorization rule.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        topic_name (str): Name of the topic.
        authorization_rule_name (str): Name of the authorization rule.
    
    Returns:
        AccessKeys: Primary and secondary keys with connection strings.
    """

def regenerate_keys(
    self,
    resource_group_name: str,
    namespace_name: str,
    topic_name: str,
    authorization_rule_name: str,
    parameters: RegenerateAccessKeyParameters
) -> AccessKeys:
    """Regenerate access keys for a topic authorization rule.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        topic_name (str): Name of the topic.
        authorization_rule_name (str): Name of the authorization rule.
        parameters (RegenerateAccessKeyParameters): Regeneration parameters.
    
    Returns:
        AccessKeys: New keys with connection strings.
    """

Subscription Lifecycle Management

Create, retrieve, update, delete, and list subscriptions for topics to enable message consumption.

def list_by_topic(
    self,
    resource_group_name: str,
    namespace_name: str,
    topic_name: str,
    skip: Optional[int] = None,
    top: Optional[int] = None
) -> ItemPaged[SBSubscription]:
    """List subscriptions for a topic.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        topic_name (str): Name of the topic.
        skip (int, optional): Number of subscriptions to skip.
        top (int, optional): Maximum number of subscriptions to return.
    
    Returns:
        ItemPaged[SBSubscription]: Iterable of subscription resources.
    """

def create_or_update(
    self,
    resource_group_name: str,
    namespace_name: str,
    topic_name: str,
    subscription_name: str,
    parameters: SBSubscription
) -> SBSubscription:
    """Create or update a subscription.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        topic_name (str): Name of the topic.
        subscription_name (str): Name of the subscription.
        parameters (SBSubscription): Subscription configuration parameters.
    
    Returns:
        SBSubscription: The created or updated subscription.
    """

def get(
    self,
    resource_group_name: str,
    namespace_name: str,
    topic_name: str,
    subscription_name: str
) -> SBSubscription:
    """Get details of a specific subscription.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        topic_name (str): Name of the topic.
        subscription_name (str): Name of the subscription.
    
    Returns:
        SBSubscription: The subscription resource.
    """

def delete(
    self,
    resource_group_name: str,
    namespace_name: str,
    topic_name: str,
    subscription_name: str
) -> None:
    """Delete a subscription.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        topic_name (str): Name of the topic.
        subscription_name (str): Name of the subscription.
    """

Usage Examples

Creating a Basic Topic

from azure.mgmt.servicebus import ServiceBusManagementClient
from azure.mgmt.servicebus.models import SBTopic
from azure.identity import DefaultAzureCredential
from datetime import timedelta

client = ServiceBusManagementClient(DefaultAzureCredential(), subscription_id)

# Create a basic topic
topic_params = SBTopic(
    max_size_in_megabytes=2048,  # 2GB
    default_message_time_to_live=timedelta(days=7)  # 7 days TTL
)

topic = client.topics.create_or_update(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events",
    parameters=topic_params
)

print(f"Topic created: {topic.name}")
print(f"Max size: {topic.max_size_in_megabytes} MB")

Creating a Topic with Advanced Features

from azure.mgmt.servicebus.models import EntityStatus

# Create a topic with advanced messaging features  
advanced_topic_params = SBTopic(
    max_size_in_megabytes=5120,  # 5GB
    requires_duplicate_detection=True,  # Enable duplicate detection
    duplicate_detection_history_time_window=timedelta(minutes=10),
    enable_batched_operations=True,  # Enable server-side batching
    support_ordering=True,  # Enable message ordering
    auto_delete_on_idle=timedelta(days=30),  # Auto-delete if idle for 30 days
    enable_partitioning=True,  # Enable partitioning for higher throughput
    enable_express=False,  # Disable express mode for durability
    status=EntityStatus.ACTIVE,
    default_message_time_to_live=timedelta(days=14)
)

advanced_topic = client.topics.create_or_update(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="high-volume-events",
    parameters=advanced_topic_params
)

Creating Subscriptions for a Topic

from azure.mgmt.servicebus.models import SBSubscription

# Create a basic subscription
basic_subscription_params = SBSubscription(
    lock_duration=timedelta(minutes=1),  # Lock duration for message processing
    max_delivery_count=10,  # Max delivery attempts before dead lettering
    dead_lettering_on_message_expiration=True,  # Dead letter expired messages
    enable_batched_operations=True,
    default_message_time_to_live=timedelta(days=7)
)

basic_subscription = client.subscriptions.create_or_update(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events",
    subscription_name="order-processor",
    parameters=basic_subscription_params
)

# Create a session-enabled subscription for ordered processing
session_subscription_params = SBSubscription(
    requires_session=True,  # Enable message sessions
    lock_duration=timedelta(minutes=5),
    max_delivery_count=5,
    dead_lettering_on_filter_evaluation_exceptions=True,  # Dead letter on filter errors
    dead_lettering_on_message_expiration=True,
    default_message_time_to_live=timedelta(days=3)
)

session_subscription = client.subscriptions.create_or_update(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events",
    subscription_name="sequential-processor",
    parameters=session_subscription_params
)

# Create a subscription with message forwarding
forwarding_subscription_params = SBSubscription(
    forward_to="processed-orders-queue",  # Forward messages to a queue
    forward_dead_lettered_messages_to="error-queue",  # Forward dead letters
    max_delivery_count=3,
    dead_lettering_on_message_expiration=True
)

forwarding_subscription = client.subscriptions.create_or_update(
    resource_group_name="my-resource-group", 
    namespace_name="my-servicebus-namespace",
    topic_name="order-events",
    subscription_name="forwarding-processor",
    parameters=forwarding_subscription_params
)

Managing Topic Authorization Rules

from azure.mgmt.servicebus.models import SBAuthorizationRule, AccessRights

# Create a send-only authorization rule for publishers
send_rule = SBAuthorizationRule(
    rights=[AccessRights.SEND]
)

publisher_auth = client.topics.create_or_update_authorization_rule(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events",
    authorization_rule_name="PublisherPolicy",
    parameters=send_rule
)

# Create a manage rule for administrative access
manage_rule = SBAuthorizationRule(
    rights=[AccessRights.MANAGE, AccessRights.SEND, AccessRights.LISTEN]
)

admin_auth = client.topics.create_or_update_authorization_rule(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events", 
    authorization_rule_name="AdminPolicy",
    parameters=manage_rule
)

# Get connection strings
publisher_keys = client.topics.list_keys(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events",
    authorization_rule_name="PublisherPolicy"
)

print(f"Publisher connection: {publisher_keys.primary_connection_string}")

Topic and Subscription Monitoring

# Get topic statistics
topic_info = client.topics.get(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events"
)

print(f"Topic: {topic_info.name}")
print(f"Status: {topic_info.status}")
print(f"Subscription count: {topic_info.subscription_count}")
print(f"Size in bytes: {topic_info.size_in_bytes}")
print(f"Created: {topic_info.created_at}")

if topic_info.count_details:
    details = topic_info.count_details
    print(f"Active messages: {details.active_message_count}")

# Get subscription statistics  
subscription_info = client.subscriptions.get(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events",
    subscription_name="order-processor"
)

print(f"Subscription: {subscription_info.name}")
print(f"Message count: {subscription_info.message_count}")
print(f"Status: {subscription_info.status}")

if subscription_info.count_details:
    details = subscription_info.count_details
    print(f"Active messages: {details.active_message_count}")
    print(f"Dead letter messages: {details.dead_letter_message_count}")

Types

class SBTopic:
    def __init__(self, **kwargs): ...
    
    # Topic statistics (read-only)
    size_in_bytes: Optional[int]
    created_at: Optional[datetime]
    updated_at: Optional[datetime] 
    accessed_at: Optional[datetime]
    subscription_count: Optional[int]
    count_details: Optional[MessageCountDetails]
    
    # Topic configuration
    default_message_time_to_live: Optional[timedelta]  # Default: TimeSpan.Max
    max_size_in_megabytes: Optional[int]  # Default: 1024 MB
    max_message_size_in_kilobytes: Optional[int]  # Premium only, default: 1024 KB
    requires_duplicate_detection: Optional[bool]  # Default: False
    duplicate_detection_history_time_window: Optional[timedelta]  # Default: 10 minutes
    enable_batched_operations: Optional[bool]  # Default: True
    status: Optional[Union[str, EntityStatus]]  # Default: Active
    support_ordering: Optional[bool]  # Default: False
    auto_delete_on_idle: Optional[timedelta]  # Minimum: 5 minutes
    enable_partitioning: Optional[bool]  # Default: False
    enable_express: Optional[bool]  # Default: False

class SBSubscription:
    def __init__(self, **kwargs): ...
    
    # Subscription statistics (read-only)
    message_count: Optional[int]
    created_at: Optional[datetime]
    accessed_at: Optional[datetime]
    updated_at: Optional[datetime]
    count_details: Optional[MessageCountDetails]
    
    # Subscription configuration
    lock_duration: Optional[timedelta]  # Default: 1 minute, Max: 5 minutes
    requires_session: Optional[bool]  # Default: False
    default_message_time_to_live: Optional[timedelta]  # Default: TimeSpan.Max
    dead_lettering_on_filter_evaluation_exceptions: Optional[bool]  # Default: True
    dead_lettering_on_message_expiration: Optional[bool]  # Default: False
    duplicate_detection_history_time_window: Optional[timedelta]  # Default: 10 minutes
    max_delivery_count: Optional[int]  # Default: 10
    status: Optional[Union[str, EntityStatus]]  # Default: Active
    enable_batched_operations: Optional[bool]  # Default: True
    auto_delete_on_idle: Optional[timedelta]  # Minimum: 5 minutes
    forward_to: Optional[str]  # Queue/topic name for message forwarding
    forward_dead_lettered_messages_to: Optional[str]  # Dead letter forwarding destination

Install with Tessl CLI

npx tessl i tessl/pypi-azure-mgmt-servicebus

docs

authorization-security.md

disaster-recovery.md

index.md

message-filtering-rules.md

migration-monitoring.md

namespace-management.md

queue-operations.md

topic-subscription-management.md

tile.json