0
# Message Filtering Rules
1
2
Message filtering and routing configuration using SQL filters and correlation filters for subscription-based message processing. Rules enable fine-grained control over which messages are delivered to specific subscriptions, supporting complex routing scenarios and message processing patterns.
3
4
## Capabilities
5
6
### Rule Lifecycle Management
7
8
Create, retrieve, update, delete, and list message filtering rules for topic subscriptions.
9
10
```python { .api }
11
def list_by_subscriptions(
12
self,
13
resource_group_name: str,
14
namespace_name: str,
15
topic_name: str,
16
subscription_name: str,
17
skip: Optional[int] = None,
18
top: Optional[int] = None
19
) -> ItemPaged[Rule]:
20
"""List rules for a subscription.
21
22
Args:
23
resource_group_name (str): Name of the resource group.
24
namespace_name (str): Name of the namespace.
25
topic_name (str): Name of the topic.
26
subscription_name (str): Name of the subscription.
27
skip (int, optional): Number of rules to skip.
28
top (int, optional): Maximum number of rules to return.
29
30
Returns:
31
ItemPaged[Rule]: Iterable of rule resources.
32
"""
33
34
def create_or_update(
35
self,
36
resource_group_name: str,
37
namespace_name: str,
38
topic_name: str,
39
subscription_name: str,
40
rule_name: str,
41
parameters: Rule
42
) -> Rule:
43
"""Create or update a subscription rule.
44
45
Args:
46
resource_group_name (str): Name of the resource group.
47
namespace_name (str): Name of the namespace.
48
topic_name (str): Name of the topic.
49
subscription_name (str): Name of the subscription.
50
rule_name (str): Name of the rule.
51
parameters (Rule): Rule configuration parameters.
52
53
Returns:
54
Rule: The created or updated rule.
55
"""
56
57
def get(
58
self,
59
resource_group_name: str,
60
namespace_name: str,
61
topic_name: str,
62
subscription_name: str,
63
rule_name: str
64
) -> Rule:
65
"""Get details of a specific rule.
66
67
Args:
68
resource_group_name (str): Name of the resource group.
69
namespace_name (str): Name of the namespace.
70
topic_name (str): Name of the topic.
71
subscription_name (str): Name of the subscription.
72
rule_name (str): Name of the rule.
73
74
Returns:
75
Rule: The rule resource.
76
"""
77
78
def delete(
79
self,
80
resource_group_name: str,
81
namespace_name: str,
82
topic_name: str,
83
subscription_name: str,
84
rule_name: str
85
) -> None:
86
"""Delete a rule.
87
88
Args:
89
resource_group_name (str): Name of the resource group.
90
namespace_name (str): Name of the namespace.
91
topic_name (str): Name of the topic.
92
subscription_name (str): Name of the subscription.
93
rule_name (str): Name of the rule.
94
"""
95
```
96
97
## Usage Examples
98
99
### Creating SQL Filter Rules
100
101
```python
102
from azure.mgmt.servicebus import ServiceBusManagementClient
103
from azure.mgmt.servicebus.models import Rule, SqlFilter, SqlRuleAction, FilterType
104
from azure.identity import DefaultAzureCredential
105
106
client = ServiceBusManagementClient(DefaultAzureCredential(), subscription_id)
107
108
# Create a rule with SQL filter for order priority
109
priority_filter = SqlFilter(
110
sql_expression="Priority > 5",
111
compatibility_level=20
112
)
113
114
priority_rule = Rule(
115
filter_type=FilterType.SQL_FILTER,
116
sql_filter=priority_filter
117
)
118
119
high_priority_rule = client.rules.create_or_update(
120
resource_group_name="my-resource-group",
121
namespace_name="my-servicebus-namespace",
122
topic_name="order-events",
123
subscription_name="high-priority-processor",
124
rule_name="HighPriorityFilter",
125
parameters=priority_rule
126
)
127
128
# Create a rule with SQL filter and action
129
region_filter = SqlFilter(
130
sql_expression="Region = 'US-East' OR Region = 'US-West'",
131
compatibility_level=20
132
)
133
134
# Add an action to modify message properties
135
region_action = SqlRuleAction(
136
sql_expression="SET ProcessingRegion = 'US'"
137
)
138
139
region_rule = Rule(
140
filter_type=FilterType.SQL_FILTER,
141
sql_filter=region_filter,
142
action=region_action
143
)
144
145
us_region_rule = client.rules.create_or_update(
146
resource_group_name="my-resource-group",
147
namespace_name="my-servicebus-namespace",
148
topic_name="order-events",
149
subscription_name="us-processor",
150
rule_name="USRegionFilter",
151
parameters=region_rule
152
)
153
154
# Create a complex SQL filter rule
155
complex_filter = SqlFilter(
156
sql_expression="""
157
(OrderType = 'Premium' AND Amount > 1000)
158
OR (OrderType = 'Standard' AND Amount > 5000)
159
OR CustomerTier = 'Gold'
160
""",
161
compatibility_level=20
162
)
163
164
complex_rule = Rule(
165
filter_type=FilterType.SQL_FILTER,
166
sql_filter=complex_filter
167
)
168
169
vip_rule = client.rules.create_or_update(
170
resource_group_name="my-resource-group",
171
namespace_name="my-servicebus-namespace",
172
topic_name="order-events",
173
subscription_name="vip-processor",
174
rule_name="VIPCustomerFilter",
175
parameters=complex_rule
176
)
177
```
178
179
### Creating Correlation Filter Rules
180
181
```python
182
from azure.mgmt.servicebus.models import CorrelationFilter
183
184
# Create a correlation filter based on message properties
185
order_correlation_filter = CorrelationFilter(
186
properties={
187
"Department": "Sales",
188
"Region": "Europe"
189
},
190
correlation_id="order-processing",
191
label="OrderEvent"
192
)
193
194
correlation_rule = Rule(
195
filter_type=FilterType.CORRELATION_FILTER,
196
correlation_filter=order_correlation_filter
197
)
198
199
europe_sales_rule = client.rules.create_or_update(
200
resource_group_name="my-resource-group",
201
namespace_name="my-servicebus-namespace",
202
topic_name="order-events",
203
subscription_name="europe-sales-processor",
204
rule_name="EuropeSalesFilter",
205
parameters=correlation_rule
206
)
207
208
# Create a correlation filter with session information
209
session_correlation_filter = CorrelationFilter(
210
session_id="payment-processing",
211
reply_to_session_id="payment-response",
212
message_id="payment-*", # Wildcard pattern
213
content_type="application/json"
214
)
215
216
session_correlation_rule = Rule(
217
filter_type=FilterType.CORRELATION_FILTER,
218
correlation_filter=session_correlation_filter
219
)
220
221
payment_session_rule = client.rules.create_or_update(
222
resource_group_name="my-resource-group",
223
namespace_name="my-servicebus-namespace",
224
topic_name="order-events",
225
subscription_name="payment-processor",
226
rule_name="PaymentSessionFilter",
227
parameters=session_correlation_rule
228
)
229
230
# Create a correlation filter for specific addresses
231
address_correlation_filter = CorrelationFilter(
232
to="order-service",
233
reply_to="response-queue"
234
)
235
236
address_correlation_rule = Rule(
237
filter_type=FilterType.CORRELATION_FILTER,
238
correlation_filter=address_correlation_filter
239
)
240
241
address_rule = client.rules.create_or_update(
242
resource_group_name="my-resource-group",
243
namespace_name="my-servicebus-namespace",
244
topic_name="order-events",
245
subscription_name="order-service-processor",
246
rule_name="AddressFilter",
247
parameters=address_correlation_rule
248
)
249
```
250
251
### Managing Default Rules
252
253
```python
254
# Every subscription has a default rule ($Default) that accepts all messages
255
# You can modify or delete this rule to change the default behavior
256
257
# Get the default rule
258
default_rule = client.rules.get(
259
resource_group_name="my-resource-group",
260
namespace_name="my-servicebus-namespace",
261
topic_name="order-events",
262
subscription_name="order-processor",
263
rule_name="$Default"
264
)
265
266
print(f"Default rule filter type: {default_rule.filter_type}")
267
268
# Delete the default rule to require explicit filtering
269
client.rules.delete(
270
resource_group_name="my-resource-group",
271
namespace_name="my-servicebus-namespace",
272
topic_name="order-events",
273
subscription_name="filtered-processor",
274
rule_name="$Default"
275
)
276
277
# Now only messages matching explicit rules will be delivered to this subscription
278
```
279
280
### Rule Monitoring and Management
281
282
```python
283
# List all rules for a subscription
284
rules = client.rules.list_by_subscriptions(
285
resource_group_name="my-resource-group",
286
namespace_name="my-servicebus-namespace",
287
topic_name="order-events",
288
subscription_name="order-processor"
289
)
290
291
print("Rules for order-processor subscription:")
292
for rule in rules:
293
print(f"- Rule: {rule.name}")
294
print(f" Filter Type: {rule.filter_type}")
295
296
if rule.sql_filter:
297
print(f" SQL Expression: {rule.sql_filter.sql_expression}")
298
299
if rule.correlation_filter:
300
filter_props = rule.correlation_filter.properties or {}
301
print(f" Correlation Properties: {filter_props}")
302
if rule.correlation_filter.label:
303
print(f" Label: {rule.correlation_filter.label}")
304
305
if rule.action:
306
print(f" Action: {rule.action.sql_expression}")
307
308
print()
309
310
# Get specific rule details
311
specific_rule = client.rules.get(
312
resource_group_name="my-resource-group",
313
namespace_name="my-servicebus-namespace",
314
topic_name="order-events",
315
subscription_name="high-priority-processor",
316
rule_name="HighPriorityFilter"
317
)
318
319
print(f"Rule: {specific_rule.name}")
320
print(f"SQL Filter: {specific_rule.sql_filter.sql_expression}")
321
```
322
323
### Advanced Rule Patterns
324
325
```python
326
# Create a rule that filters based on custom message properties
327
custom_property_filter = SqlFilter(
328
sql_expression="""
329
user.CustomerId IN ('12345', '67890')
330
AND sys.EnqueuedTimeUtc > '2023-01-01T00:00:00Z'
331
AND DATEDIFF(hour, sys.EnqueuedTimeUtc, GETUTCDATE()) < 24
332
""",
333
compatibility_level=20
334
)
335
336
# Add an action to enrich the message
337
enrichment_action = SqlRuleAction(
338
sql_expression="""
339
SET sys.Label = 'HighValueCustomer';
340
SET ProcessedAt = GETUTCDATE();
341
SET Priority = CASE
342
WHEN user.OrderAmount > 10000 THEN 'Critical'
343
WHEN user.OrderAmount > 1000 THEN 'High'
344
ELSE 'Normal'
345
END
346
"""
347
)
348
349
advanced_rule = Rule(
350
filter_type=FilterType.SQL_FILTER,
351
sql_filter=custom_property_filter,
352
action=enrichment_action
353
)
354
355
customer_rule = client.rules.create_or_update(
356
resource_group_name="my-resource-group",
357
namespace_name="my-servicebus-namespace",
358
topic_name="order-events",
359
subscription_name="premium-customer-processor",
360
rule_name="PremiumCustomerEnrichment",
361
parameters=advanced_rule
362
)
363
```
364
365
## Types
366
367
```python { .api }
368
class Rule:
369
def __init__(self, **kwargs): ...
370
371
# Rule identification
372
id: Optional[str]
373
name: Optional[str]
374
type: Optional[str]
375
376
# Rule configuration
377
action: Optional[SqlRuleAction]
378
filter_type: Optional[Union[str, FilterType]]
379
sql_filter: Optional[SqlFilter]
380
correlation_filter: Optional[CorrelationFilter]
381
382
class SqlFilter:
383
def __init__(self, **kwargs): ...
384
385
sql_expression: Optional[str] # SQL WHERE clause expression
386
compatibility_level: Optional[int] # Always 20
387
requires_preprocessing: Optional[bool] # Read-only
388
389
class CorrelationFilter:
390
def __init__(self, **kwargs): ...
391
392
# Custom properties dictionary for filtering
393
properties: Optional[Dict[str, str]]
394
395
# Standard message properties
396
correlation_id: Optional[str]
397
message_id: Optional[str]
398
to: Optional[str]
399
reply_to: Optional[str]
400
label: Optional[str]
401
session_id: Optional[str]
402
reply_to_session_id: Optional[str]
403
content_type: Optional[str]
404
405
class SqlRuleAction:
406
def __init__(self, **kwargs): ...
407
408
sql_expression: Optional[str] # SQL action expression
409
compatibility_level: Optional[int] # Always 20
410
requires_preprocessing: Optional[bool] # Read-only
411
412
from enum import Enum
413
414
class FilterType(str, Enum):
415
SQL_FILTER = "SqlFilter"
416
CORRELATION_FILTER = "CorrelationFilter"
417
```
418
419
## Filter Expression Examples
420
421
### SQL Filter Patterns
422
423
```python
424
# System properties filtering
425
"sys.MessageId = 'message-123'"
426
"sys.Label = 'OrderEvent'"
427
"sys.CorrelationId LIKE 'order-%'"
428
"sys.ContentType = 'application/json'"
429
"sys.DeliveryCount > 5"
430
"sys.EnqueuedTimeUtc > '2023-01-01T00:00:00Z'"
431
"sys.SequenceNumber > 1000"
432
"sys.Size > 1024"
433
"sys.TimeToLive < '00:30:00'" # 30 minutes
434
435
# User properties filtering
436
"user.Priority > 5"
437
"user.Region IN ('US-East', 'US-West')"
438
"user.Amount BETWEEN 100 AND 1000"
439
"user.CustomerType = 'Premium'"
440
"user.OrderDate >= '2023-01-01'"
441
442
# Complex expressions
443
"(user.Priority > 8 OR user.VIP = 'true') AND user.Region = 'Europe'"
444
"user.Amount > 1000 AND sys.Label LIKE '%urgent%'"
445
"DATEDIFF(hour, sys.EnqueuedTimeUtc, GETUTCDATE()) < 6"
446
447
# Pattern matching
448
"user.OrderId LIKE 'ORD-%'"
449
"sys.Label NOT LIKE '%test%'"
450
"user.Email LIKE '%@company.com'"
451
```
452
453
### SQL Action Patterns
454
455
```python
456
# Set properties
457
"SET sys.Label = 'Processed'"
458
"SET user.ProcessedAt = GETUTCDATE()"
459
"SET user.ProcessingRegion = 'US-East'"
460
461
# Conditional actions
462
"""
463
SET user.Priority = CASE
464
WHEN user.Amount > 10000 THEN 'Critical'
465
WHEN user.Amount > 1000 THEN 'High'
466
ELSE 'Normal'
467
END
468
"""
469
470
# Mathematical operations
471
"SET user.TotalWithTax = user.Amount * 1.08"
472
"SET user.DaysOld = DATEDIFF(day, user.CreatedDate, GETUTCDATE())"
473
474
# String operations
475
"SET user.NormalizedEmail = LOWER(user.Email)"
476
"SET user.ShortId = SUBSTRING(user.LongId, 1, 8)"
477
```