0
# Topic and Subscription Management
1
2
Topic creation and management, subscription handling, and message filtering rules for publish-subscribe messaging patterns. Topics enable one-to-many communication where messages published to a topic are delivered to multiple subscribing applications through subscriptions.
3
4
## Capabilities
5
6
### Topic Lifecycle Management
7
8
Create, retrieve, update, delete, and list Service Bus topics within a namespace.
9
10
```python { .api }
11
def list_by_namespace(
12
self,
13
resource_group_name: str,
14
namespace_name: str,
15
skip: Optional[int] = None,
16
top: Optional[int] = None
17
) -> ItemPaged[SBTopic]:
18
"""List topics in a namespace.
19
20
Args:
21
resource_group_name (str): Name of the resource group.
22
namespace_name (str): Name of the namespace.
23
skip (int, optional): Number of topics to skip.
24
top (int, optional): Maximum number of topics to return.
25
26
Returns:
27
ItemPaged[SBTopic]: Iterable of topic resources.
28
"""
29
30
def create_or_update(
31
self,
32
resource_group_name: str,
33
namespace_name: str,
34
topic_name: str,
35
parameters: SBTopic
36
) -> SBTopic:
37
"""Create or update a Service Bus topic.
38
39
Args:
40
resource_group_name (str): Name of the resource group.
41
namespace_name (str): Name of the namespace.
42
topic_name (str): Name of the topic.
43
parameters (SBTopic): Topic configuration parameters.
44
45
Returns:
46
SBTopic: The created or updated topic.
47
"""
48
49
def get(
50
self,
51
resource_group_name: str,
52
namespace_name: str,
53
topic_name: str
54
) -> SBTopic:
55
"""Get details of a specific topic.
56
57
Args:
58
resource_group_name (str): Name of the resource group.
59
namespace_name (str): Name of the namespace.
60
topic_name (str): Name of the topic.
61
62
Returns:
63
SBTopic: The topic resource.
64
"""
65
66
def delete(
67
self,
68
resource_group_name: str,
69
namespace_name: str,
70
topic_name: str
71
) -> None:
72
"""Delete a topic.
73
74
Args:
75
resource_group_name (str): Name of the resource group.
76
namespace_name (str): Name of the namespace.
77
topic_name (str): Name of the topic.
78
"""
79
```
80
81
### Topic Authorization Rules
82
83
Manage authorization rules for topic-level access control with specific rights (Send, Listen, Manage).
84
85
```python { .api }
86
def list_authorization_rules(
87
self,
88
resource_group_name: str,
89
namespace_name: str,
90
topic_name: str
91
) -> ItemPaged[SBAuthorizationRule]:
92
"""List authorization rules for a topic.
93
94
Args:
95
resource_group_name (str): Name of the resource group.
96
namespace_name (str): Name of the namespace.
97
topic_name (str): Name of the topic.
98
99
Returns:
100
ItemPaged[SBAuthorizationRule]: Iterable of authorization rules.
101
"""
102
103
def create_or_update_authorization_rule(
104
self,
105
resource_group_name: str,
106
namespace_name: str,
107
topic_name: str,
108
authorization_rule_name: str,
109
parameters: SBAuthorizationRule
110
) -> SBAuthorizationRule:
111
"""Create or update an authorization rule for a topic.
112
113
Args:
114
resource_group_name (str): Name of the resource group.
115
namespace_name (str): Name of the namespace.
116
topic_name (str): Name of the topic.
117
authorization_rule_name (str): Name of the authorization rule.
118
parameters (SBAuthorizationRule): Authorization rule parameters.
119
120
Returns:
121
SBAuthorizationRule: The created or updated authorization rule.
122
"""
123
124
def get_authorization_rule(
125
self,
126
resource_group_name: str,
127
namespace_name: str,
128
topic_name: str,
129
authorization_rule_name: str
130
) -> SBAuthorizationRule:
131
"""Get an authorization rule for a topic.
132
133
Args:
134
resource_group_name (str): Name of the resource group.
135
namespace_name (str): Name of the namespace.
136
topic_name (str): Name of the topic.
137
authorization_rule_name (str): Name of the authorization rule.
138
139
Returns:
140
SBAuthorizationRule: The authorization rule.
141
"""
142
143
def delete_authorization_rule(
144
self,
145
resource_group_name: str,
146
namespace_name: str,
147
topic_name: str,
148
authorization_rule_name: str
149
) -> None:
150
"""Delete an authorization rule for a topic.
151
152
Args:
153
resource_group_name (str): Name of the resource group.
154
namespace_name (str): Name of the namespace.
155
topic_name (str): Name of the topic.
156
authorization_rule_name (str): Name of the authorization rule.
157
"""
158
```
159
160
### Topic Access Key Management
161
162
Generate and manage access keys for topic-specific authentication.
163
164
```python { .api }
165
def list_keys(
166
self,
167
resource_group_name: str,
168
namespace_name: str,
169
topic_name: str,
170
authorization_rule_name: str
171
) -> AccessKeys:
172
"""Get access keys for a topic authorization rule.
173
174
Args:
175
resource_group_name (str): Name of the resource group.
176
namespace_name (str): Name of the namespace.
177
topic_name (str): Name of the topic.
178
authorization_rule_name (str): Name of the authorization rule.
179
180
Returns:
181
AccessKeys: Primary and secondary keys with connection strings.
182
"""
183
184
def regenerate_keys(
185
self,
186
resource_group_name: str,
187
namespace_name: str,
188
topic_name: str,
189
authorization_rule_name: str,
190
parameters: RegenerateAccessKeyParameters
191
) -> AccessKeys:
192
"""Regenerate access keys for a topic authorization rule.
193
194
Args:
195
resource_group_name (str): Name of the resource group.
196
namespace_name (str): Name of the namespace.
197
topic_name (str): Name of the topic.
198
authorization_rule_name (str): Name of the authorization rule.
199
parameters (RegenerateAccessKeyParameters): Regeneration parameters.
200
201
Returns:
202
AccessKeys: New keys with connection strings.
203
"""
204
```
205
206
### Subscription Lifecycle Management
207
208
Create, retrieve, update, delete, and list subscriptions for topics to enable message consumption.
209
210
```python { .api }
211
def list_by_topic(
212
self,
213
resource_group_name: str,
214
namespace_name: str,
215
topic_name: str,
216
skip: Optional[int] = None,
217
top: Optional[int] = None
218
) -> ItemPaged[SBSubscription]:
219
"""List subscriptions for a topic.
220
221
Args:
222
resource_group_name (str): Name of the resource group.
223
namespace_name (str): Name of the namespace.
224
topic_name (str): Name of the topic.
225
skip (int, optional): Number of subscriptions to skip.
226
top (int, optional): Maximum number of subscriptions to return.
227
228
Returns:
229
ItemPaged[SBSubscription]: Iterable of subscription resources.
230
"""
231
232
def create_or_update(
233
self,
234
resource_group_name: str,
235
namespace_name: str,
236
topic_name: str,
237
subscription_name: str,
238
parameters: SBSubscription
239
) -> SBSubscription:
240
"""Create or update a subscription.
241
242
Args:
243
resource_group_name (str): Name of the resource group.
244
namespace_name (str): Name of the namespace.
245
topic_name (str): Name of the topic.
246
subscription_name (str): Name of the subscription.
247
parameters (SBSubscription): Subscription configuration parameters.
248
249
Returns:
250
SBSubscription: The created or updated subscription.
251
"""
252
253
def get(
254
self,
255
resource_group_name: str,
256
namespace_name: str,
257
topic_name: str,
258
subscription_name: str
259
) -> SBSubscription:
260
"""Get details of a specific subscription.
261
262
Args:
263
resource_group_name (str): Name of the resource group.
264
namespace_name (str): Name of the namespace.
265
topic_name (str): Name of the topic.
266
subscription_name (str): Name of the subscription.
267
268
Returns:
269
SBSubscription: The subscription resource.
270
"""
271
272
def delete(
273
self,
274
resource_group_name: str,
275
namespace_name: str,
276
topic_name: str,
277
subscription_name: str
278
) -> None:
279
"""Delete a subscription.
280
281
Args:
282
resource_group_name (str): Name of the resource group.
283
namespace_name (str): Name of the namespace.
284
topic_name (str): Name of the topic.
285
subscription_name (str): Name of the subscription.
286
"""
287
```
288
289
## Usage Examples
290
291
### Creating a Basic Topic
292
293
```python
294
from azure.mgmt.servicebus import ServiceBusManagementClient
295
from azure.mgmt.servicebus.models import SBTopic
296
from azure.identity import DefaultAzureCredential
297
from datetime import timedelta
298
299
client = ServiceBusManagementClient(DefaultAzureCredential(), subscription_id)
300
301
# Create a basic topic
302
topic_params = SBTopic(
303
max_size_in_megabytes=2048, # 2GB
304
default_message_time_to_live=timedelta(days=7) # 7 days TTL
305
)
306
307
topic = client.topics.create_or_update(
308
resource_group_name="my-resource-group",
309
namespace_name="my-servicebus-namespace",
310
topic_name="order-events",
311
parameters=topic_params
312
)
313
314
print(f"Topic created: {topic.name}")
315
print(f"Max size: {topic.max_size_in_megabytes} MB")
316
```
317
318
### Creating a Topic with Advanced Features
319
320
```python
321
from azure.mgmt.servicebus.models import EntityStatus
322
323
# Create a topic with advanced messaging features
324
advanced_topic_params = SBTopic(
325
max_size_in_megabytes=5120, # 5GB
326
requires_duplicate_detection=True, # Enable duplicate detection
327
duplicate_detection_history_time_window=timedelta(minutes=10),
328
enable_batched_operations=True, # Enable server-side batching
329
support_ordering=True, # Enable message ordering
330
auto_delete_on_idle=timedelta(days=30), # Auto-delete if idle for 30 days
331
enable_partitioning=True, # Enable partitioning for higher throughput
332
enable_express=False, # Disable express mode for durability
333
status=EntityStatus.ACTIVE,
334
default_message_time_to_live=timedelta(days=14)
335
)
336
337
advanced_topic = client.topics.create_or_update(
338
resource_group_name="my-resource-group",
339
namespace_name="my-servicebus-namespace",
340
topic_name="high-volume-events",
341
parameters=advanced_topic_params
342
)
343
```
344
345
### Creating Subscriptions for a Topic
346
347
```python
348
from azure.mgmt.servicebus.models import SBSubscription
349
350
# Create a basic subscription
351
basic_subscription_params = SBSubscription(
352
lock_duration=timedelta(minutes=1), # Lock duration for message processing
353
max_delivery_count=10, # Max delivery attempts before dead lettering
354
dead_lettering_on_message_expiration=True, # Dead letter expired messages
355
enable_batched_operations=True,
356
default_message_time_to_live=timedelta(days=7)
357
)
358
359
basic_subscription = client.subscriptions.create_or_update(
360
resource_group_name="my-resource-group",
361
namespace_name="my-servicebus-namespace",
362
topic_name="order-events",
363
subscription_name="order-processor",
364
parameters=basic_subscription_params
365
)
366
367
# Create a session-enabled subscription for ordered processing
368
session_subscription_params = SBSubscription(
369
requires_session=True, # Enable message sessions
370
lock_duration=timedelta(minutes=5),
371
max_delivery_count=5,
372
dead_lettering_on_filter_evaluation_exceptions=True, # Dead letter on filter errors
373
dead_lettering_on_message_expiration=True,
374
default_message_time_to_live=timedelta(days=3)
375
)
376
377
session_subscription = client.subscriptions.create_or_update(
378
resource_group_name="my-resource-group",
379
namespace_name="my-servicebus-namespace",
380
topic_name="order-events",
381
subscription_name="sequential-processor",
382
parameters=session_subscription_params
383
)
384
385
# Create a subscription with message forwarding
386
forwarding_subscription_params = SBSubscription(
387
forward_to="processed-orders-queue", # Forward messages to a queue
388
forward_dead_lettered_messages_to="error-queue", # Forward dead letters
389
max_delivery_count=3,
390
dead_lettering_on_message_expiration=True
391
)
392
393
forwarding_subscription = client.subscriptions.create_or_update(
394
resource_group_name="my-resource-group",
395
namespace_name="my-servicebus-namespace",
396
topic_name="order-events",
397
subscription_name="forwarding-processor",
398
parameters=forwarding_subscription_params
399
)
400
```
401
402
### Managing Topic Authorization Rules
403
404
```python
405
from azure.mgmt.servicebus.models import SBAuthorizationRule, AccessRights
406
407
# Create a send-only authorization rule for publishers
408
send_rule = SBAuthorizationRule(
409
rights=[AccessRights.SEND]
410
)
411
412
publisher_auth = client.topics.create_or_update_authorization_rule(
413
resource_group_name="my-resource-group",
414
namespace_name="my-servicebus-namespace",
415
topic_name="order-events",
416
authorization_rule_name="PublisherPolicy",
417
parameters=send_rule
418
)
419
420
# Create a manage rule for administrative access
421
manage_rule = SBAuthorizationRule(
422
rights=[AccessRights.MANAGE, AccessRights.SEND, AccessRights.LISTEN]
423
)
424
425
admin_auth = client.topics.create_or_update_authorization_rule(
426
resource_group_name="my-resource-group",
427
namespace_name="my-servicebus-namespace",
428
topic_name="order-events",
429
authorization_rule_name="AdminPolicy",
430
parameters=manage_rule
431
)
432
433
# Get connection strings
434
publisher_keys = client.topics.list_keys(
435
resource_group_name="my-resource-group",
436
namespace_name="my-servicebus-namespace",
437
topic_name="order-events",
438
authorization_rule_name="PublisherPolicy"
439
)
440
441
print(f"Publisher connection: {publisher_keys.primary_connection_string}")
442
```
443
444
### Topic and Subscription Monitoring
445
446
```python
447
# Get topic statistics
448
topic_info = client.topics.get(
449
resource_group_name="my-resource-group",
450
namespace_name="my-servicebus-namespace",
451
topic_name="order-events"
452
)
453
454
print(f"Topic: {topic_info.name}")
455
print(f"Status: {topic_info.status}")
456
print(f"Subscription count: {topic_info.subscription_count}")
457
print(f"Size in bytes: {topic_info.size_in_bytes}")
458
print(f"Created: {topic_info.created_at}")
459
460
if topic_info.count_details:
461
details = topic_info.count_details
462
print(f"Active messages: {details.active_message_count}")
463
464
# Get subscription statistics
465
subscription_info = client.subscriptions.get(
466
resource_group_name="my-resource-group",
467
namespace_name="my-servicebus-namespace",
468
topic_name="order-events",
469
subscription_name="order-processor"
470
)
471
472
print(f"Subscription: {subscription_info.name}")
473
print(f"Message count: {subscription_info.message_count}")
474
print(f"Status: {subscription_info.status}")
475
476
if subscription_info.count_details:
477
details = subscription_info.count_details
478
print(f"Active messages: {details.active_message_count}")
479
print(f"Dead letter messages: {details.dead_letter_message_count}")
480
```
481
482
## Types
483
484
```python { .api }
485
class SBTopic:
486
def __init__(self, **kwargs): ...
487
488
# Topic statistics (read-only)
489
size_in_bytes: Optional[int]
490
created_at: Optional[datetime]
491
updated_at: Optional[datetime]
492
accessed_at: Optional[datetime]
493
subscription_count: Optional[int]
494
count_details: Optional[MessageCountDetails]
495
496
# Topic configuration
497
default_message_time_to_live: Optional[timedelta] # Default: TimeSpan.Max
498
max_size_in_megabytes: Optional[int] # Default: 1024 MB
499
max_message_size_in_kilobytes: Optional[int] # Premium only, default: 1024 KB
500
requires_duplicate_detection: Optional[bool] # Default: False
501
duplicate_detection_history_time_window: Optional[timedelta] # Default: 10 minutes
502
enable_batched_operations: Optional[bool] # Default: True
503
status: Optional[Union[str, EntityStatus]] # Default: Active
504
support_ordering: Optional[bool] # Default: False
505
auto_delete_on_idle: Optional[timedelta] # Minimum: 5 minutes
506
enable_partitioning: Optional[bool] # Default: False
507
enable_express: Optional[bool] # Default: False
508
509
class SBSubscription:
510
def __init__(self, **kwargs): ...
511
512
# Subscription statistics (read-only)
513
message_count: Optional[int]
514
created_at: Optional[datetime]
515
accessed_at: Optional[datetime]
516
updated_at: Optional[datetime]
517
count_details: Optional[MessageCountDetails]
518
519
# Subscription configuration
520
lock_duration: Optional[timedelta] # Default: 1 minute, Max: 5 minutes
521
requires_session: Optional[bool] # Default: False
522
default_message_time_to_live: Optional[timedelta] # Default: TimeSpan.Max
523
dead_lettering_on_filter_evaluation_exceptions: Optional[bool] # Default: True
524
dead_lettering_on_message_expiration: Optional[bool] # Default: False
525
duplicate_detection_history_time_window: Optional[timedelta] # Default: 10 minutes
526
max_delivery_count: Optional[int] # Default: 10
527
status: Optional[Union[str, EntityStatus]] # Default: Active
528
enable_batched_operations: Optional[bool] # Default: True
529
auto_delete_on_idle: Optional[timedelta] # Minimum: 5 minutes
530
forward_to: Optional[str] # Queue/topic name for message forwarding
531
forward_dead_lettered_messages_to: Optional[str] # Dead letter forwarding destination
532
```