CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-mgmt-servicebus

Microsoft Azure Service Bus Management Client Library for programmatic control of Service Bus namespaces, queues, topics, subscriptions, and authorization rules

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

message-filtering-rules.mddocs/

Message Filtering Rules

Message filtering and routing configuration using SQL filters and correlation filters for subscription-based message processing. Rules enable fine-grained control over which messages are delivered to specific subscriptions, supporting complex routing scenarios and message processing patterns.

Capabilities

Rule Lifecycle Management

Create, retrieve, update, delete, and list message filtering rules for topic subscriptions.

def list_by_subscriptions(
    self,
    resource_group_name: str,
    namespace_name: str,
    topic_name: str,
    subscription_name: str,
    skip: Optional[int] = None,
    top: Optional[int] = None
) -> ItemPaged[Rule]:
    """List rules for a subscription.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        topic_name (str): Name of the topic.
        subscription_name (str): Name of the subscription.
        skip (int, optional): Number of rules to skip.
        top (int, optional): Maximum number of rules to return.
    
    Returns:
        ItemPaged[Rule]: Iterable of rule resources.
    """

def create_or_update(
    self,
    resource_group_name: str,
    namespace_name: str,
    topic_name: str,
    subscription_name: str,
    rule_name: str,
    parameters: Rule
) -> Rule:
    """Create or update a subscription rule.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        topic_name (str): Name of the topic.
        subscription_name (str): Name of the subscription.
        rule_name (str): Name of the rule.
        parameters (Rule): Rule configuration parameters.
    
    Returns:
        Rule: The created or updated rule.
    """

def get(
    self,
    resource_group_name: str,
    namespace_name: str,
    topic_name: str,
    subscription_name: str,
    rule_name: str
) -> Rule:
    """Get details of a specific rule.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        topic_name (str): Name of the topic.
        subscription_name (str): Name of the subscription.
        rule_name (str): Name of the rule.
    
    Returns:
        Rule: The rule resource.
    """

def delete(
    self,
    resource_group_name: str,
    namespace_name: str,
    topic_name: str,
    subscription_name: str,
    rule_name: str
) -> None:
    """Delete a rule.
    
    Args:
        resource_group_name (str): Name of the resource group.
        namespace_name (str): Name of the namespace.
        topic_name (str): Name of the topic.
        subscription_name (str): Name of the subscription.
        rule_name (str): Name of the rule.
    """

Usage Examples

Creating SQL Filter Rules

from azure.mgmt.servicebus import ServiceBusManagementClient
from azure.mgmt.servicebus.models import Rule, SqlFilter, SqlRuleAction, FilterType
from azure.identity import DefaultAzureCredential

client = ServiceBusManagementClient(DefaultAzureCredential(), subscription_id)

# Create a rule with SQL filter for order priority
priority_filter = SqlFilter(
    sql_expression="Priority > 5",
    compatibility_level=20
)

priority_rule = Rule(
    filter_type=FilterType.SQL_FILTER,
    sql_filter=priority_filter
)

high_priority_rule = client.rules.create_or_update(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events",
    subscription_name="high-priority-processor",
    rule_name="HighPriorityFilter",
    parameters=priority_rule
)

# Create a rule with SQL filter and action
region_filter = SqlFilter(
    sql_expression="Region = 'US-East' OR Region = 'US-West'",
    compatibility_level=20
)

# Add an action to modify message properties
region_action = SqlRuleAction(
    sql_expression="SET ProcessingRegion = 'US'"
)

region_rule = Rule(
    filter_type=FilterType.SQL_FILTER,
    sql_filter=region_filter,
    action=region_action
)

us_region_rule = client.rules.create_or_update(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events",
    subscription_name="us-processor",
    rule_name="USRegionFilter",
    parameters=region_rule
)

# Create a complex SQL filter rule
complex_filter = SqlFilter(
    sql_expression="""
        (OrderType = 'Premium' AND Amount > 1000) 
        OR (OrderType = 'Standard' AND Amount > 5000)
        OR CustomerTier = 'Gold'
    """,
    compatibility_level=20
)

complex_rule = Rule(
    filter_type=FilterType.SQL_FILTER,
    sql_filter=complex_filter
)

vip_rule = client.rules.create_or_update(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events",
    subscription_name="vip-processor",
    rule_name="VIPCustomerFilter",
    parameters=complex_rule
)

Creating Correlation Filter Rules

from azure.mgmt.servicebus.models import CorrelationFilter

# Create a correlation filter based on message properties
order_correlation_filter = CorrelationFilter(
    properties={
        "Department": "Sales",
        "Region": "Europe"
    },
    correlation_id="order-processing",
    label="OrderEvent"
)

correlation_rule = Rule(
    filter_type=FilterType.CORRELATION_FILTER,
    correlation_filter=order_correlation_filter
)

europe_sales_rule = client.rules.create_or_update(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events", 
    subscription_name="europe-sales-processor",
    rule_name="EuropeSalesFilter",
    parameters=correlation_rule
)

# Create a correlation filter with session information
session_correlation_filter = CorrelationFilter(
    session_id="payment-processing",
    reply_to_session_id="payment-response",
    message_id="payment-*",  # Wildcard pattern
    content_type="application/json"
)

session_correlation_rule = Rule(
    filter_type=FilterType.CORRELATION_FILTER,
    correlation_filter=session_correlation_filter
)

payment_session_rule = client.rules.create_or_update(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events",
    subscription_name="payment-processor",
    rule_name="PaymentSessionFilter",
    parameters=session_correlation_rule
)

# Create a correlation filter for specific addresses
address_correlation_filter = CorrelationFilter(
    to="order-service",
    reply_to="response-queue"
)

address_correlation_rule = Rule(
    filter_type=FilterType.CORRELATION_FILTER,
    correlation_filter=address_correlation_filter
)

address_rule = client.rules.create_or_update(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events",
    subscription_name="order-service-processor",
    rule_name="AddressFilter",
    parameters=address_correlation_rule
)

Managing Default Rules

# Every subscription has a default rule ($Default) that accepts all messages
# You can modify or delete this rule to change the default behavior

# Get the default rule
default_rule = client.rules.get(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events",
    subscription_name="order-processor",
    rule_name="$Default"
)

print(f"Default rule filter type: {default_rule.filter_type}")

# Delete the default rule to require explicit filtering
client.rules.delete(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events",
    subscription_name="filtered-processor",
    rule_name="$Default"
)

# Now only messages matching explicit rules will be delivered to this subscription

Rule Monitoring and Management

# List all rules for a subscription
rules = client.rules.list_by_subscriptions(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events",
    subscription_name="order-processor"
)

print("Rules for order-processor subscription:")
for rule in rules:
    print(f"- Rule: {rule.name}")
    print(f"  Filter Type: {rule.filter_type}")
    
    if rule.sql_filter:
        print(f"  SQL Expression: {rule.sql_filter.sql_expression}")
    
    if rule.correlation_filter:
        filter_props = rule.correlation_filter.properties or {}
        print(f"  Correlation Properties: {filter_props}")
        if rule.correlation_filter.label:
            print(f"  Label: {rule.correlation_filter.label}")
    
    if rule.action:
        print(f"  Action: {rule.action.sql_expression}")
    
    print()

# Get specific rule details
specific_rule = client.rules.get(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events",
    subscription_name="high-priority-processor",
    rule_name="HighPriorityFilter"
)

print(f"Rule: {specific_rule.name}")
print(f"SQL Filter: {specific_rule.sql_filter.sql_expression}")

Advanced Rule Patterns

# Create a rule that filters based on custom message properties
custom_property_filter = SqlFilter(
    sql_expression="""
        user.CustomerId IN ('12345', '67890') 
        AND sys.EnqueuedTimeUtc > '2023-01-01T00:00:00Z'
        AND DATEDIFF(hour, sys.EnqueuedTimeUtc, GETUTCDATE()) < 24
    """,
    compatibility_level=20
)

# Add an action to enrich the message
enrichment_action = SqlRuleAction(
    sql_expression="""
        SET sys.Label = 'HighValueCustomer';
        SET ProcessedAt = GETUTCDATE();
        SET Priority = CASE 
            WHEN user.OrderAmount > 10000 THEN 'Critical'
            WHEN user.OrderAmount > 1000 THEN 'High'
            ELSE 'Normal'
        END
    """
)

advanced_rule = Rule(
    filter_type=FilterType.SQL_FILTER,
    sql_filter=custom_property_filter,
    action=enrichment_action
)

customer_rule = client.rules.create_or_update(
    resource_group_name="my-resource-group",
    namespace_name="my-servicebus-namespace",
    topic_name="order-events",
    subscription_name="premium-customer-processor",
    rule_name="PremiumCustomerEnrichment",
    parameters=advanced_rule
)

Types

class Rule:
    def __init__(self, **kwargs): ...
    
    # Rule identification
    id: Optional[str]
    name: Optional[str]
    type: Optional[str]
    
    # Rule configuration
    action: Optional[SqlRuleAction]
    filter_type: Optional[Union[str, FilterType]]
    sql_filter: Optional[SqlFilter]
    correlation_filter: Optional[CorrelationFilter]

class SqlFilter:
    def __init__(self, **kwargs): ...
    
    sql_expression: Optional[str]  # SQL WHERE clause expression
    compatibility_level: Optional[int]  # Always 20
    requires_preprocessing: Optional[bool]  # Read-only

class CorrelationFilter:
    def __init__(self, **kwargs): ...
    
    # Custom properties dictionary for filtering
    properties: Optional[Dict[str, str]]
    
    # Standard message properties
    correlation_id: Optional[str]
    message_id: Optional[str]
    to: Optional[str]
    reply_to: Optional[str]
    label: Optional[str]
    session_id: Optional[str]
    reply_to_session_id: Optional[str]
    content_type: Optional[str]

class SqlRuleAction:
    def __init__(self, **kwargs): ...
    
    sql_expression: Optional[str]  # SQL action expression
    compatibility_level: Optional[int]  # Always 20
    requires_preprocessing: Optional[bool]  # Read-only

from enum import Enum

class FilterType(str, Enum):
    SQL_FILTER = "SqlFilter"
    CORRELATION_FILTER = "CorrelationFilter"

Filter Expression Examples

SQL Filter Patterns

# System properties filtering
"sys.MessageId = 'message-123'"
"sys.Label = 'OrderEvent'" 
"sys.CorrelationId LIKE 'order-%'"
"sys.ContentType = 'application/json'"
"sys.DeliveryCount > 5"
"sys.EnqueuedTimeUtc > '2023-01-01T00:00:00Z'"
"sys.SequenceNumber > 1000"
"sys.Size > 1024"
"sys.TimeToLive < '00:30:00'"  # 30 minutes

# User properties filtering  
"user.Priority > 5"
"user.Region IN ('US-East', 'US-West')"
"user.Amount BETWEEN 100 AND 1000"
"user.CustomerType = 'Premium'"
"user.OrderDate >= '2023-01-01'"

# Complex expressions
"(user.Priority > 8 OR user.VIP = 'true') AND user.Region = 'Europe'"
"user.Amount > 1000 AND sys.Label LIKE '%urgent%'"
"DATEDIFF(hour, sys.EnqueuedTimeUtc, GETUTCDATE()) < 6"

# Pattern matching
"user.OrderId LIKE 'ORD-%'"
"sys.Label NOT LIKE '%test%'"
"user.Email LIKE '%@company.com'"

SQL Action Patterns

# Set properties
"SET sys.Label = 'Processed'"
"SET user.ProcessedAt = GETUTCDATE()"
"SET user.ProcessingRegion = 'US-East'"

# Conditional actions
"""
SET user.Priority = CASE
    WHEN user.Amount > 10000 THEN 'Critical'
    WHEN user.Amount > 1000 THEN 'High'
    ELSE 'Normal'
END
"""

# Mathematical operations
"SET user.TotalWithTax = user.Amount * 1.08"
"SET user.DaysOld = DATEDIFF(day, user.CreatedDate, GETUTCDATE())"

# String operations
"SET user.NormalizedEmail = LOWER(user.Email)"
"SET user.ShortId = SUBSTRING(user.LongId, 1, 8)"

Install with Tessl CLI

npx tessl i tessl/pypi-azure-mgmt-servicebus

docs

authorization-security.md

disaster-recovery.md

index.md

message-filtering-rules.md

migration-monitoring.md

namespace-management.md

queue-operations.md

topic-subscription-management.md

tile.json