Microsoft Azure Service Bus Management Client Library for programmatic control of Service Bus namespaces, queues, topics, subscriptions, and authorization rules
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Message filtering and routing configuration using SQL filters and correlation filters for subscription-based message processing. Rules enable fine-grained control over which messages are delivered to specific subscriptions, supporting complex routing scenarios and message processing patterns.
Create, retrieve, update, delete, and list message filtering rules for topic subscriptions.
def list_by_subscriptions(
self,
resource_group_name: str,
namespace_name: str,
topic_name: str,
subscription_name: str,
skip: Optional[int] = None,
top: Optional[int] = None
) -> ItemPaged[Rule]:
"""List rules for a subscription.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
topic_name (str): Name of the topic.
subscription_name (str): Name of the subscription.
skip (int, optional): Number of rules to skip.
top (int, optional): Maximum number of rules to return.
Returns:
ItemPaged[Rule]: Iterable of rule resources.
"""
def create_or_update(
self,
resource_group_name: str,
namespace_name: str,
topic_name: str,
subscription_name: str,
rule_name: str,
parameters: Rule
) -> Rule:
"""Create or update a subscription rule.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
topic_name (str): Name of the topic.
subscription_name (str): Name of the subscription.
rule_name (str): Name of the rule.
parameters (Rule): Rule configuration parameters.
Returns:
Rule: The created or updated rule.
"""
def get(
self,
resource_group_name: str,
namespace_name: str,
topic_name: str,
subscription_name: str,
rule_name: str
) -> Rule:
"""Get details of a specific rule.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
topic_name (str): Name of the topic.
subscription_name (str): Name of the subscription.
rule_name (str): Name of the rule.
Returns:
Rule: The rule resource.
"""
def delete(
self,
resource_group_name: str,
namespace_name: str,
topic_name: str,
subscription_name: str,
rule_name: str
) -> None:
"""Delete a rule.
Args:
resource_group_name (str): Name of the resource group.
namespace_name (str): Name of the namespace.
topic_name (str): Name of the topic.
subscription_name (str): Name of the subscription.
rule_name (str): Name of the rule.
"""from azure.mgmt.servicebus import ServiceBusManagementClient
from azure.mgmt.servicebus.models import Rule, SqlFilter, SqlRuleAction, FilterType
from azure.identity import DefaultAzureCredential
client = ServiceBusManagementClient(DefaultAzureCredential(), subscription_id)
# Create a rule with SQL filter for order priority
priority_filter = SqlFilter(
sql_expression="Priority > 5",
compatibility_level=20
)
priority_rule = Rule(
filter_type=FilterType.SQL_FILTER,
sql_filter=priority_filter
)
high_priority_rule = client.rules.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
subscription_name="high-priority-processor",
rule_name="HighPriorityFilter",
parameters=priority_rule
)
# Create a rule with SQL filter and action
region_filter = SqlFilter(
sql_expression="Region = 'US-East' OR Region = 'US-West'",
compatibility_level=20
)
# Add an action to modify message properties
region_action = SqlRuleAction(
sql_expression="SET ProcessingRegion = 'US'"
)
region_rule = Rule(
filter_type=FilterType.SQL_FILTER,
sql_filter=region_filter,
action=region_action
)
us_region_rule = client.rules.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
subscription_name="us-processor",
rule_name="USRegionFilter",
parameters=region_rule
)
# Create a complex SQL filter rule
complex_filter = SqlFilter(
sql_expression="""
(OrderType = 'Premium' AND Amount > 1000)
OR (OrderType = 'Standard' AND Amount > 5000)
OR CustomerTier = 'Gold'
""",
compatibility_level=20
)
complex_rule = Rule(
filter_type=FilterType.SQL_FILTER,
sql_filter=complex_filter
)
vip_rule = client.rules.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
subscription_name="vip-processor",
rule_name="VIPCustomerFilter",
parameters=complex_rule
)from azure.mgmt.servicebus.models import CorrelationFilter
# Create a correlation filter based on message properties
order_correlation_filter = CorrelationFilter(
properties={
"Department": "Sales",
"Region": "Europe"
},
correlation_id="order-processing",
label="OrderEvent"
)
correlation_rule = Rule(
filter_type=FilterType.CORRELATION_FILTER,
correlation_filter=order_correlation_filter
)
europe_sales_rule = client.rules.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
subscription_name="europe-sales-processor",
rule_name="EuropeSalesFilter",
parameters=correlation_rule
)
# Create a correlation filter with session information
session_correlation_filter = CorrelationFilter(
session_id="payment-processing",
reply_to_session_id="payment-response",
message_id="payment-*", # Wildcard pattern
content_type="application/json"
)
session_correlation_rule = Rule(
filter_type=FilterType.CORRELATION_FILTER,
correlation_filter=session_correlation_filter
)
payment_session_rule = client.rules.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
subscription_name="payment-processor",
rule_name="PaymentSessionFilter",
parameters=session_correlation_rule
)
# Create a correlation filter for specific addresses
address_correlation_filter = CorrelationFilter(
to="order-service",
reply_to="response-queue"
)
address_correlation_rule = Rule(
filter_type=FilterType.CORRELATION_FILTER,
correlation_filter=address_correlation_filter
)
address_rule = client.rules.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
subscription_name="order-service-processor",
rule_name="AddressFilter",
parameters=address_correlation_rule
)# Every subscription has a default rule ($Default) that accepts all messages
# You can modify or delete this rule to change the default behavior
# Get the default rule
default_rule = client.rules.get(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
subscription_name="order-processor",
rule_name="$Default"
)
print(f"Default rule filter type: {default_rule.filter_type}")
# Delete the default rule to require explicit filtering
client.rules.delete(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
subscription_name="filtered-processor",
rule_name="$Default"
)
# Now only messages matching explicit rules will be delivered to this subscription# List all rules for a subscription
rules = client.rules.list_by_subscriptions(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
subscription_name="order-processor"
)
print("Rules for order-processor subscription:")
for rule in rules:
print(f"- Rule: {rule.name}")
print(f" Filter Type: {rule.filter_type}")
if rule.sql_filter:
print(f" SQL Expression: {rule.sql_filter.sql_expression}")
if rule.correlation_filter:
filter_props = rule.correlation_filter.properties or {}
print(f" Correlation Properties: {filter_props}")
if rule.correlation_filter.label:
print(f" Label: {rule.correlation_filter.label}")
if rule.action:
print(f" Action: {rule.action.sql_expression}")
print()
# Get specific rule details
specific_rule = client.rules.get(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
subscription_name="high-priority-processor",
rule_name="HighPriorityFilter"
)
print(f"Rule: {specific_rule.name}")
print(f"SQL Filter: {specific_rule.sql_filter.sql_expression}")# Create a rule that filters based on custom message properties
custom_property_filter = SqlFilter(
sql_expression="""
user.CustomerId IN ('12345', '67890')
AND sys.EnqueuedTimeUtc > '2023-01-01T00:00:00Z'
AND DATEDIFF(hour, sys.EnqueuedTimeUtc, GETUTCDATE()) < 24
""",
compatibility_level=20
)
# Add an action to enrich the message
enrichment_action = SqlRuleAction(
sql_expression="""
SET sys.Label = 'HighValueCustomer';
SET ProcessedAt = GETUTCDATE();
SET Priority = CASE
WHEN user.OrderAmount > 10000 THEN 'Critical'
WHEN user.OrderAmount > 1000 THEN 'High'
ELSE 'Normal'
END
"""
)
advanced_rule = Rule(
filter_type=FilterType.SQL_FILTER,
sql_filter=custom_property_filter,
action=enrichment_action
)
customer_rule = client.rules.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-servicebus-namespace",
topic_name="order-events",
subscription_name="premium-customer-processor",
rule_name="PremiumCustomerEnrichment",
parameters=advanced_rule
)class Rule:
def __init__(self, **kwargs): ...
# Rule identification
id: Optional[str]
name: Optional[str]
type: Optional[str]
# Rule configuration
action: Optional[SqlRuleAction]
filter_type: Optional[Union[str, FilterType]]
sql_filter: Optional[SqlFilter]
correlation_filter: Optional[CorrelationFilter]
class SqlFilter:
def __init__(self, **kwargs): ...
sql_expression: Optional[str] # SQL WHERE clause expression
compatibility_level: Optional[int] # Always 20
requires_preprocessing: Optional[bool] # Read-only
class CorrelationFilter:
def __init__(self, **kwargs): ...
# Custom properties dictionary for filtering
properties: Optional[Dict[str, str]]
# Standard message properties
correlation_id: Optional[str]
message_id: Optional[str]
to: Optional[str]
reply_to: Optional[str]
label: Optional[str]
session_id: Optional[str]
reply_to_session_id: Optional[str]
content_type: Optional[str]
class SqlRuleAction:
def __init__(self, **kwargs): ...
sql_expression: Optional[str] # SQL action expression
compatibility_level: Optional[int] # Always 20
requires_preprocessing: Optional[bool] # Read-only
from enum import Enum
class FilterType(str, Enum):
SQL_FILTER = "SqlFilter"
CORRELATION_FILTER = "CorrelationFilter"# System properties filtering
"sys.MessageId = 'message-123'"
"sys.Label = 'OrderEvent'"
"sys.CorrelationId LIKE 'order-%'"
"sys.ContentType = 'application/json'"
"sys.DeliveryCount > 5"
"sys.EnqueuedTimeUtc > '2023-01-01T00:00:00Z'"
"sys.SequenceNumber > 1000"
"sys.Size > 1024"
"sys.TimeToLive < '00:30:00'" # 30 minutes
# User properties filtering
"user.Priority > 5"
"user.Region IN ('US-East', 'US-West')"
"user.Amount BETWEEN 100 AND 1000"
"user.CustomerType = 'Premium'"
"user.OrderDate >= '2023-01-01'"
# Complex expressions
"(user.Priority > 8 OR user.VIP = 'true') AND user.Region = 'Europe'"
"user.Amount > 1000 AND sys.Label LIKE '%urgent%'"
"DATEDIFF(hour, sys.EnqueuedTimeUtc, GETUTCDATE()) < 6"
# Pattern matching
"user.OrderId LIKE 'ORD-%'"
"sys.Label NOT LIKE '%test%'"
"user.Email LIKE '%@company.com'"# Set properties
"SET sys.Label = 'Processed'"
"SET user.ProcessedAt = GETUTCDATE()"
"SET user.ProcessingRegion = 'US-East'"
# Conditional actions
"""
SET user.Priority = CASE
WHEN user.Amount > 10000 THEN 'Critical'
WHEN user.Amount > 1000 THEN 'High'
ELSE 'Normal'
END
"""
# Mathematical operations
"SET user.TotalWithTax = user.Amount * 1.08"
"SET user.DaysOld = DATEDIFF(day, user.CreatedDate, GETUTCDATE())"
# String operations
"SET user.NormalizedEmail = LOWER(user.Email)"
"SET user.ShortId = SUBSTRING(user.LongId, 1, 8)"Install with Tessl CLI
npx tessl i tessl/pypi-azure-mgmt-servicebus