or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authorization-security.mddisaster-recovery.mdindex.mdmessage-filtering-rules.mdmigration-monitoring.mdnamespace-management.mdqueue-operations.mdtopic-subscription-management.md

message-filtering-rules.mddocs/

0

# Message Filtering Rules

1

2

Message filtering and routing configuration using SQL filters and correlation filters for subscription-based message processing. Rules enable fine-grained control over which messages are delivered to specific subscriptions, supporting complex routing scenarios and message processing patterns.

3

4

## Capabilities

5

6

### Rule Lifecycle Management

7

8

Create, retrieve, update, delete, and list message filtering rules for topic subscriptions.

9

10

```python { .api }

11

def list_by_subscriptions(

12

self,

13

resource_group_name: str,

14

namespace_name: str,

15

topic_name: str,

16

subscription_name: str,

17

skip: Optional[int] = None,

18

top: Optional[int] = None

19

) -> ItemPaged[Rule]:

20

"""List rules for a subscription.

21

22

Args:

23

resource_group_name (str): Name of the resource group.

24

namespace_name (str): Name of the namespace.

25

topic_name (str): Name of the topic.

26

subscription_name (str): Name of the subscription.

27

skip (int, optional): Number of rules to skip.

28

top (int, optional): Maximum number of rules to return.

29

30

Returns:

31

ItemPaged[Rule]: Iterable of rule resources.

32

"""

33

34

def create_or_update(

35

self,

36

resource_group_name: str,

37

namespace_name: str,

38

topic_name: str,

39

subscription_name: str,

40

rule_name: str,

41

parameters: Rule

42

) -> Rule:

43

"""Create or update a subscription rule.

44

45

Args:

46

resource_group_name (str): Name of the resource group.

47

namespace_name (str): Name of the namespace.

48

topic_name (str): Name of the topic.

49

subscription_name (str): Name of the subscription.

50

rule_name (str): Name of the rule.

51

parameters (Rule): Rule configuration parameters.

52

53

Returns:

54

Rule: The created or updated rule.

55

"""

56

57

def get(

58

self,

59

resource_group_name: str,

60

namespace_name: str,

61

topic_name: str,

62

subscription_name: str,

63

rule_name: str

64

) -> Rule:

65

"""Get details of a specific rule.

66

67

Args:

68

resource_group_name (str): Name of the resource group.

69

namespace_name (str): Name of the namespace.

70

topic_name (str): Name of the topic.

71

subscription_name (str): Name of the subscription.

72

rule_name (str): Name of the rule.

73

74

Returns:

75

Rule: The rule resource.

76

"""

77

78

def delete(

79

self,

80

resource_group_name: str,

81

namespace_name: str,

82

topic_name: str,

83

subscription_name: str,

84

rule_name: str

85

) -> None:

86

"""Delete a rule.

87

88

Args:

89

resource_group_name (str): Name of the resource group.

90

namespace_name (str): Name of the namespace.

91

topic_name (str): Name of the topic.

92

subscription_name (str): Name of the subscription.

93

rule_name (str): Name of the rule.

94

"""

95

```

96

97

## Usage Examples

98

99

### Creating SQL Filter Rules

100

101

```python

102

from azure.mgmt.servicebus import ServiceBusManagementClient

103

from azure.mgmt.servicebus.models import Rule, SqlFilter, SqlRuleAction, FilterType

104

from azure.identity import DefaultAzureCredential

105

106

client = ServiceBusManagementClient(DefaultAzureCredential(), subscription_id)

107

108

# Create a rule with SQL filter for order priority

109

priority_filter = SqlFilter(

110

sql_expression="Priority > 5",

111

compatibility_level=20

112

)

113

114

priority_rule = Rule(

115

filter_type=FilterType.SQL_FILTER,

116

sql_filter=priority_filter

117

)

118

119

high_priority_rule = client.rules.create_or_update(

120

resource_group_name="my-resource-group",

121

namespace_name="my-servicebus-namespace",

122

topic_name="order-events",

123

subscription_name="high-priority-processor",

124

rule_name="HighPriorityFilter",

125

parameters=priority_rule

126

)

127

128

# Create a rule with SQL filter and action

129

region_filter = SqlFilter(

130

sql_expression="Region = 'US-East' OR Region = 'US-West'",

131

compatibility_level=20

132

)

133

134

# Add an action to modify message properties

135

region_action = SqlRuleAction(

136

sql_expression="SET ProcessingRegion = 'US'"

137

)

138

139

region_rule = Rule(

140

filter_type=FilterType.SQL_FILTER,

141

sql_filter=region_filter,

142

action=region_action

143

)

144

145

us_region_rule = client.rules.create_or_update(

146

resource_group_name="my-resource-group",

147

namespace_name="my-servicebus-namespace",

148

topic_name="order-events",

149

subscription_name="us-processor",

150

rule_name="USRegionFilter",

151

parameters=region_rule

152

)

153

154

# Create a complex SQL filter rule

155

complex_filter = SqlFilter(

156

sql_expression="""

157

(OrderType = 'Premium' AND Amount > 1000)

158

OR (OrderType = 'Standard' AND Amount > 5000)

159

OR CustomerTier = 'Gold'

160

""",

161

compatibility_level=20

162

)

163

164

complex_rule = Rule(

165

filter_type=FilterType.SQL_FILTER,

166

sql_filter=complex_filter

167

)

168

169

vip_rule = client.rules.create_or_update(

170

resource_group_name="my-resource-group",

171

namespace_name="my-servicebus-namespace",

172

topic_name="order-events",

173

subscription_name="vip-processor",

174

rule_name="VIPCustomerFilter",

175

parameters=complex_rule

176

)

177

```

178

179

### Creating Correlation Filter Rules

180

181

```python

182

from azure.mgmt.servicebus.models import CorrelationFilter

183

184

# Create a correlation filter based on message properties

185

order_correlation_filter = CorrelationFilter(

186

properties={

187

"Department": "Sales",

188

"Region": "Europe"

189

},

190

correlation_id="order-processing",

191

label="OrderEvent"

192

)

193

194

correlation_rule = Rule(

195

filter_type=FilterType.CORRELATION_FILTER,

196

correlation_filter=order_correlation_filter

197

)

198

199

europe_sales_rule = client.rules.create_or_update(

200

resource_group_name="my-resource-group",

201

namespace_name="my-servicebus-namespace",

202

topic_name="order-events",

203

subscription_name="europe-sales-processor",

204

rule_name="EuropeSalesFilter",

205

parameters=correlation_rule

206

)

207

208

# Create a correlation filter with session information

209

session_correlation_filter = CorrelationFilter(

210

session_id="payment-processing",

211

reply_to_session_id="payment-response",

212

message_id="payment-*", # Wildcard pattern

213

content_type="application/json"

214

)

215

216

session_correlation_rule = Rule(

217

filter_type=FilterType.CORRELATION_FILTER,

218

correlation_filter=session_correlation_filter

219

)

220

221

payment_session_rule = client.rules.create_or_update(

222

resource_group_name="my-resource-group",

223

namespace_name="my-servicebus-namespace",

224

topic_name="order-events",

225

subscription_name="payment-processor",

226

rule_name="PaymentSessionFilter",

227

parameters=session_correlation_rule

228

)

229

230

# Create a correlation filter for specific addresses

231

address_correlation_filter = CorrelationFilter(

232

to="order-service",

233

reply_to="response-queue"

234

)

235

236

address_correlation_rule = Rule(

237

filter_type=FilterType.CORRELATION_FILTER,

238

correlation_filter=address_correlation_filter

239

)

240

241

address_rule = client.rules.create_or_update(

242

resource_group_name="my-resource-group",

243

namespace_name="my-servicebus-namespace",

244

topic_name="order-events",

245

subscription_name="order-service-processor",

246

rule_name="AddressFilter",

247

parameters=address_correlation_rule

248

)

249

```

250

251

### Managing Default Rules

252

253

```python

254

# Every subscription has a default rule ($Default) that accepts all messages

255

# You can modify or delete this rule to change the default behavior

256

257

# Get the default rule

258

default_rule = client.rules.get(

259

resource_group_name="my-resource-group",

260

namespace_name="my-servicebus-namespace",

261

topic_name="order-events",

262

subscription_name="order-processor",

263

rule_name="$Default"

264

)

265

266

print(f"Default rule filter type: {default_rule.filter_type}")

267

268

# Delete the default rule to require explicit filtering

269

client.rules.delete(

270

resource_group_name="my-resource-group",

271

namespace_name="my-servicebus-namespace",

272

topic_name="order-events",

273

subscription_name="filtered-processor",

274

rule_name="$Default"

275

)

276

277

# Now only messages matching explicit rules will be delivered to this subscription

278

```

279

280

### Rule Monitoring and Management

281

282

```python

283

# List all rules for a subscription

284

rules = client.rules.list_by_subscriptions(

285

resource_group_name="my-resource-group",

286

namespace_name="my-servicebus-namespace",

287

topic_name="order-events",

288

subscription_name="order-processor"

289

)

290

291

print("Rules for order-processor subscription:")

292

for rule in rules:

293

print(f"- Rule: {rule.name}")

294

print(f" Filter Type: {rule.filter_type}")

295

296

if rule.sql_filter:

297

print(f" SQL Expression: {rule.sql_filter.sql_expression}")

298

299

if rule.correlation_filter:

300

filter_props = rule.correlation_filter.properties or {}

301

print(f" Correlation Properties: {filter_props}")

302

if rule.correlation_filter.label:

303

print(f" Label: {rule.correlation_filter.label}")

304

305

if rule.action:

306

print(f" Action: {rule.action.sql_expression}")

307

308

print()

309

310

# Get specific rule details

311

specific_rule = client.rules.get(

312

resource_group_name="my-resource-group",

313

namespace_name="my-servicebus-namespace",

314

topic_name="order-events",

315

subscription_name="high-priority-processor",

316

rule_name="HighPriorityFilter"

317

)

318

319

print(f"Rule: {specific_rule.name}")

320

print(f"SQL Filter: {specific_rule.sql_filter.sql_expression}")

321

```

322

323

### Advanced Rule Patterns

324

325

```python

326

# Create a rule that filters based on custom message properties

327

custom_property_filter = SqlFilter(

328

sql_expression="""

329

user.CustomerId IN ('12345', '67890')

330

AND sys.EnqueuedTimeUtc > '2023-01-01T00:00:00Z'

331

AND DATEDIFF(hour, sys.EnqueuedTimeUtc, GETUTCDATE()) < 24

332

""",

333

compatibility_level=20

334

)

335

336

# Add an action to enrich the message

337

enrichment_action = SqlRuleAction(

338

sql_expression="""

339

SET sys.Label = 'HighValueCustomer';

340

SET ProcessedAt = GETUTCDATE();

341

SET Priority = CASE

342

WHEN user.OrderAmount > 10000 THEN 'Critical'

343

WHEN user.OrderAmount > 1000 THEN 'High'

344

ELSE 'Normal'

345

END

346

"""

347

)

348

349

advanced_rule = Rule(

350

filter_type=FilterType.SQL_FILTER,

351

sql_filter=custom_property_filter,

352

action=enrichment_action

353

)

354

355

customer_rule = client.rules.create_or_update(

356

resource_group_name="my-resource-group",

357

namespace_name="my-servicebus-namespace",

358

topic_name="order-events",

359

subscription_name="premium-customer-processor",

360

rule_name="PremiumCustomerEnrichment",

361

parameters=advanced_rule

362

)

363

```

364

365

## Types

366

367

```python { .api }

368

class Rule:

369

def __init__(self, **kwargs): ...

370

371

# Rule identification

372

id: Optional[str]

373

name: Optional[str]

374

type: Optional[str]

375

376

# Rule configuration

377

action: Optional[SqlRuleAction]

378

filter_type: Optional[Union[str, FilterType]]

379

sql_filter: Optional[SqlFilter]

380

correlation_filter: Optional[CorrelationFilter]

381

382

class SqlFilter:

383

def __init__(self, **kwargs): ...

384

385

sql_expression: Optional[str] # SQL WHERE clause expression

386

compatibility_level: Optional[int] # Always 20

387

requires_preprocessing: Optional[bool] # Read-only

388

389

class CorrelationFilter:

390

def __init__(self, **kwargs): ...

391

392

# Custom properties dictionary for filtering

393

properties: Optional[Dict[str, str]]

394

395

# Standard message properties

396

correlation_id: Optional[str]

397

message_id: Optional[str]

398

to: Optional[str]

399

reply_to: Optional[str]

400

label: Optional[str]

401

session_id: Optional[str]

402

reply_to_session_id: Optional[str]

403

content_type: Optional[str]

404

405

class SqlRuleAction:

406

def __init__(self, **kwargs): ...

407

408

sql_expression: Optional[str] # SQL action expression

409

compatibility_level: Optional[int] # Always 20

410

requires_preprocessing: Optional[bool] # Read-only

411

412

from enum import Enum

413

414

class FilterType(str, Enum):

415

SQL_FILTER = "SqlFilter"

416

CORRELATION_FILTER = "CorrelationFilter"

417

```

418

419

## Filter Expression Examples

420

421

### SQL Filter Patterns

422

423

```python

424

# System properties filtering

425

"sys.MessageId = 'message-123'"

426

"sys.Label = 'OrderEvent'"

427

"sys.CorrelationId LIKE 'order-%'"

428

"sys.ContentType = 'application/json'"

429

"sys.DeliveryCount > 5"

430

"sys.EnqueuedTimeUtc > '2023-01-01T00:00:00Z'"

431

"sys.SequenceNumber > 1000"

432

"sys.Size > 1024"

433

"sys.TimeToLive < '00:30:00'" # 30 minutes

434

435

# User properties filtering

436

"user.Priority > 5"

437

"user.Region IN ('US-East', 'US-West')"

438

"user.Amount BETWEEN 100 AND 1000"

439

"user.CustomerType = 'Premium'"

440

"user.OrderDate >= '2023-01-01'"

441

442

# Complex expressions

443

"(user.Priority > 8 OR user.VIP = 'true') AND user.Region = 'Europe'"

444

"user.Amount > 1000 AND sys.Label LIKE '%urgent%'"

445

"DATEDIFF(hour, sys.EnqueuedTimeUtc, GETUTCDATE()) < 6"

446

447

# Pattern matching

448

"user.OrderId LIKE 'ORD-%'"

449

"sys.Label NOT LIKE '%test%'"

450

"user.Email LIKE '%@company.com'"

451

```

452

453

### SQL Action Patterns

454

455

```python

456

# Set properties

457

"SET sys.Label = 'Processed'"

458

"SET user.ProcessedAt = GETUTCDATE()"

459

"SET user.ProcessingRegion = 'US-East'"

460

461

# Conditional actions

462

"""

463

SET user.Priority = CASE

464

WHEN user.Amount > 10000 THEN 'Critical'

465

WHEN user.Amount > 1000 THEN 'High'

466

ELSE 'Normal'

467

END

468

"""

469

470

# Mathematical operations

471

"SET user.TotalWithTax = user.Amount * 1.08"

472

"SET user.DaysOld = DATEDIFF(day, user.CreatedDate, GETUTCDATE())"

473

474

# String operations

475

"SET user.NormalizedEmail = LOWER(user.Email)"

476

"SET user.ShortId = SUBSTRING(user.LongId, 1, 8)"

477

```