0
# Enterprise Features
1
2
Advanced enterprise capabilities for Azure Event Hub including dedicated clusters, disaster recovery, schema registry, and application groups. These features provide production-scale reliability, compliance, and operational capabilities for mission-critical event streaming scenarios.
3
4
## Capabilities
5
6
### Dedicated Cluster Management
7
8
Dedicated clusters provide isolated, single-tenant Event Hub infrastructure with guaranteed capacity, predictable performance, and enhanced security isolation for enterprise workloads.
9
10
```python { .api }
11
class ClustersOperations:
12
def list_available_cluster_region(
13
self,
14
**kwargs
15
) -> AvailableClustersList:
16
"""List available regions for cluster deployment."""
17
18
def list_by_subscription(
19
self,
20
**kwargs
21
) -> Iterable[Cluster]:
22
"""List clusters in subscription."""
23
24
def list_by_resource_group(
25
self,
26
resource_group_name: str,
27
**kwargs
28
) -> Iterable[Cluster]:
29
"""List clusters in resource group."""
30
31
def get(
32
self,
33
resource_group_name: str,
34
cluster_name: str,
35
**kwargs
36
) -> Cluster:
37
"""Get cluster details."""
38
39
def begin_create_or_update(
40
self,
41
resource_group_name: str,
42
cluster_name: str,
43
parameters: Union[Cluster, IO[bytes]],
44
**kwargs
45
) -> LROPoller[Cluster]:
46
"""Create or update cluster (long-running operation)."""
47
48
def begin_update(
49
self,
50
resource_group_name: str,
51
cluster_name: str,
52
parameters: Union[Cluster, IO[bytes]],
53
**kwargs
54
) -> LROPoller[Cluster]:
55
"""Update cluster (long-running operation)."""
56
57
def begin_delete(
58
self,
59
resource_group_name: str,
60
cluster_name: str,
61
**kwargs
62
) -> LROPoller[None]:
63
"""Delete cluster (long-running operation)."""
64
65
def list_namespaces(
66
self,
67
resource_group_name: str,
68
cluster_name: str,
69
**kwargs
70
) -> Iterable[EHNamespace]:
71
"""List namespaces in cluster."""
72
73
class ConfigurationOperations:
74
def patch(
75
self,
76
resource_group_name: str,
77
cluster_name: str,
78
parameters: Union[ClusterQuotaConfigurationProperties, IO[bytes]],
79
**kwargs
80
) -> ClusterQuotaConfigurationProperties:
81
"""Update cluster configuration."""
82
83
def get(
84
self,
85
resource_group_name: str,
86
cluster_name: str,
87
**kwargs
88
) -> ClusterQuotaConfigurationProperties:
89
"""Get cluster configuration."""
90
```
91
92
#### Usage Example
93
94
```python
95
from azure.mgmt.eventhub.models import Cluster, Sku as ClusterSku, ClusterSkuName
96
97
# Check available regions for cluster deployment
98
available_regions = client.clusters.list_available_cluster_region()
99
for region in available_regions.value:
100
print(f"Available region: {region.code}")
101
102
# Create dedicated cluster
103
cluster_params = Cluster(
104
location="East US",
105
sku=ClusterSku(name=ClusterSkuName.DEDICATED, capacity=1),
106
tags={"environment": "production", "tier": "dedicated"}
107
)
108
109
cluster_operation = client.clusters.begin_create_or_update(
110
resource_group_name="my-resource-group",
111
cluster_name="my-eventhub-cluster",
112
parameters=cluster_params
113
)
114
115
# Wait for cluster creation (can take 1+ hours)
116
cluster = cluster_operation.result()
117
print(f"Created cluster: {cluster.name} with capacity: {cluster.sku.capacity}")
118
119
# Configure cluster quota settings
120
from azure.mgmt.eventhub.models import ClusterQuotaConfigurationProperties
121
122
quota_config = ClusterQuotaConfigurationProperties(
123
settings={
124
"maxIngressUnitsPerCluster": "20",
125
"maxEgressUnitsPerCluster": "20"
126
}
127
)
128
129
updated_config = client.configuration.patch(
130
resource_group_name="my-resource-group",
131
cluster_name="my-eventhub-cluster",
132
parameters=quota_config
133
)
134
135
# Associate namespace with cluster
136
namespace_params = EHNamespace(
137
location="East US",
138
sku=Sku(name="Premium", tier="Premium"),
139
cluster_arm_id=cluster.id
140
)
141
142
namespace_operation = client.namespaces.begin_create_or_update(
143
resource_group_name="my-resource-group",
144
namespace_name="my-cluster-namespace",
145
parameters=namespace_params
146
)
147
148
namespace = namespace_operation.result()
149
print(f"Associated namespace {namespace.name} with cluster")
150
```
151
152
### Disaster Recovery Management
153
154
Disaster recovery provides automated geo-replication and failover capabilities to ensure business continuity and data availability across Azure regions.
155
156
```python { .api }
157
class DisasterRecoveryConfigsOperations:
158
def check_name_availability(
159
self,
160
resource_group_name: str,
161
namespace_name: str,
162
parameters: Union[CheckNameAvailabilityParameter, IO[bytes]],
163
**kwargs
164
) -> CheckNameAvailabilityResult:
165
"""Check disaster recovery alias name availability."""
166
167
def list(
168
self,
169
resource_group_name: str,
170
namespace_name: str,
171
**kwargs
172
) -> Iterable[ArmDisasterRecovery]:
173
"""List disaster recovery configurations."""
174
175
def create_or_update(
176
self,
177
resource_group_name: str,
178
namespace_name: str,
179
alias: str,
180
parameters: Union[ArmDisasterRecovery, IO[bytes]],
181
**kwargs
182
) -> ArmDisasterRecovery:
183
"""Create or update disaster recovery configuration."""
184
185
def delete(
186
self,
187
resource_group_name: str,
188
namespace_name: str,
189
alias: str,
190
**kwargs
191
) -> None:
192
"""Delete disaster recovery configuration."""
193
194
def get(
195
self,
196
resource_group_name: str,
197
namespace_name: str,
198
alias: str,
199
**kwargs
200
) -> ArmDisasterRecovery:
201
"""Get disaster recovery configuration."""
202
203
def break_pairing(
204
self,
205
resource_group_name: str,
206
namespace_name: str,
207
alias: str,
208
**kwargs
209
) -> None:
210
"""Break disaster recovery pairing."""
211
212
def fail_over(
213
self,
214
resource_group_name: str,
215
namespace_name: str,
216
alias: str,
217
**kwargs
218
) -> None:
219
"""Initiate disaster recovery failover."""
220
221
def list_authorization_rules(
222
self,
223
resource_group_name: str,
224
namespace_name: str,
225
alias: str,
226
**kwargs
227
) -> Iterable[AuthorizationRule]:
228
"""List authorization rules for disaster recovery alias."""
229
230
def get_authorization_rule(
231
self,
232
resource_group_name: str,
233
namespace_name: str,
234
alias: str,
235
authorization_rule_name: str,
236
**kwargs
237
) -> AuthorizationRule:
238
"""Get authorization rule for disaster recovery alias."""
239
240
def list_keys(
241
self,
242
resource_group_name: str,
243
namespace_name: str,
244
alias: str,
245
authorization_rule_name: str,
246
**kwargs
247
) -> AccessKeys:
248
"""Get authorization rule keys for disaster recovery alias."""
249
```
250
251
#### Usage Example
252
253
```python
254
from azure.mgmt.eventhub.models import ArmDisasterRecovery
255
256
# Create disaster recovery configuration
257
dr_config = ArmDisasterRecovery(
258
partner_namespace="/subscriptions/{partner-sub}/resourceGroups/{partner-rg}/providers/Microsoft.EventHub/namespaces/{partner-namespace}",
259
alternate_name="my-dr-alias"
260
)
261
262
dr_pairing = client.disaster_recovery_configs.create_or_update(
263
resource_group_name="my-primary-rg",
264
namespace_name="my-primary-namespace",
265
alias="my-dr-alias",
266
parameters=dr_config
267
)
268
269
print(f"Created DR configuration: {dr_pairing.name}")
270
print(f"Partner namespace: {dr_pairing.partner_namespace}")
271
print(f"Role: {dr_pairing.role}")
272
print(f"Provisioning state: {dr_pairing.provisioning_state}")
273
274
# Monitor replication status
275
dr_status = client.disaster_recovery_configs.get(
276
resource_group_name="my-primary-rg",
277
namespace_name="my-primary-namespace",
278
alias="my-dr-alias"
279
)
280
281
if dr_status.provisioning_state == "Succeeded":
282
print("Disaster recovery pairing is active")
283
284
# Get connection strings for the DR alias
285
dr_auth_rules = client.disaster_recovery_configs.list_authorization_rules(
286
resource_group_name="my-primary-rg",
287
namespace_name="my-primary-namespace",
288
alias="my-dr-alias"
289
)
290
291
for rule in dr_auth_rules:
292
dr_keys = client.disaster_recovery_configs.list_keys(
293
resource_group_name="my-primary-rg",
294
namespace_name="my-primary-namespace",
295
alias="my-dr-alias",
296
authorization_rule_name=rule.name
297
)
298
print(f"DR Connection String: {dr_keys.primary_connection_string}")
299
300
# Simulate disaster scenario - initiate failover
301
client.disaster_recovery_configs.fail_over(
302
resource_group_name="my-primary-rg",
303
namespace_name="my-primary-namespace",
304
alias="my-dr-alias"
305
)
306
307
print("Failover initiated - secondary namespace is now primary")
308
```
309
310
### Schema Registry Management
311
312
Schema registry provides centralized schema management for structured data, enabling schema evolution, compatibility checking, and data governance for Event Hub streams.
313
314
```python { .api }
315
class SchemaRegistryOperations:
316
def list_by_namespace(
317
self,
318
resource_group_name: str,
319
namespace_name: str,
320
**kwargs
321
) -> Iterable[SchemaGroup]:
322
"""List schema groups in namespace."""
323
324
def create_or_update(
325
self,
326
resource_group_name: str,
327
namespace_name: str,
328
schema_group_name: str,
329
parameters: Union[SchemaGroup, IO[bytes]],
330
**kwargs
331
) -> SchemaGroup:
332
"""Create or update schema group."""
333
334
def delete(
335
self,
336
resource_group_name: str,
337
namespace_name: str,
338
schema_group_name: str,
339
**kwargs
340
) -> None:
341
"""Delete schema group."""
342
343
def get(
344
self,
345
resource_group_name: str,
346
namespace_name: str,
347
schema_group_name: str,
348
**kwargs
349
) -> SchemaGroup:
350
"""Get schema group."""
351
```
352
353
#### Usage Example
354
355
```python
356
from azure.mgmt.eventhub.models import SchemaGroup, SchemaCompatibility, SchemaType
357
358
# Create schema groups for different data formats
359
telemetry_schema_group = SchemaGroup(
360
group_properties={
361
"Namespace": "my-eventhub-namespace",
362
"Name": "telemetry-schemas"
363
},
364
schema_compatibility=SchemaCompatibility.BACKWARD,
365
schema_type=SchemaType.AVRO,
366
user_metadata="Schema group for telemetry data formats"
367
)
368
369
telemetry_group = client.schema_registry.create_or_update(
370
resource_group_name="my-resource-group",
371
namespace_name="my-eventhub-namespace",
372
schema_group_name="telemetry-schemas",
373
parameters=telemetry_schema_group
374
)
375
376
# Create schema group for audit events
377
audit_schema_group = SchemaGroup(
378
group_properties={
379
"Namespace": "my-eventhub-namespace",
380
"Name": "audit-schemas"
381
},
382
schema_compatibility=SchemaCompatibility.FORWARD,
383
schema_type=SchemaType.AVRO,
384
user_metadata="Schema group for audit event formats"
385
)
386
387
audit_group = client.schema_registry.create_or_update(
388
resource_group_name="my-resource-group",
389
namespace_name="my-eventhub-namespace",
390
schema_group_name="audit-schemas",
391
parameters=audit_schema_group
392
)
393
394
# List all schema groups
395
schema_groups = client.schema_registry.list_by_namespace(
396
resource_group_name="my-resource-group",
397
namespace_name="my-eventhub-namespace"
398
)
399
400
for group in schema_groups:
401
print(f"Schema Group: {group.name}")
402
print(f" Type: {group.schema_type}")
403
print(f" Compatibility: {group.schema_compatibility}")
404
print(f" Description: {group.user_metadata}")
405
```
406
407
### Application Group Management
408
409
Application groups provide resource quotas and throttling policies to ensure fair resource usage and prevent resource exhaustion in multi-tenant scenarios.
410
411
```python { .api }
412
class ApplicationGroupOperations:
413
def list_by_namespace(
414
self,
415
resource_group_name: str,
416
namespace_name: str,
417
**kwargs
418
) -> Iterable[ApplicationGroup]:
419
"""List application groups in namespace."""
420
421
def create_or_update_application_group(
422
self,
423
resource_group_name: str,
424
namespace_name: str,
425
application_group_name: str,
426
parameters: Union[ApplicationGroup, IO[bytes]],
427
**kwargs
428
) -> ApplicationGroup:
429
"""Create or update application group."""
430
431
def delete(
432
self,
433
resource_group_name: str,
434
namespace_name: str,
435
application_group_name: str,
436
**kwargs
437
) -> None:
438
"""Delete application group."""
439
440
def get(
441
self,
442
resource_group_name: str,
443
namespace_name: str,
444
application_group_name: str,
445
**kwargs
446
) -> ApplicationGroup:
447
"""Get application group."""
448
```
449
450
#### Usage Example
451
452
```python
453
from azure.mgmt.eventhub.models import (
454
ApplicationGroup, ThrottlingPolicy,
455
ApplicationGroupPolicyType, MetricId
456
)
457
458
# Create application group with throttling policies
459
throttling_policies = [
460
ThrottlingPolicy(
461
name="IngressBytesThrottle",
462
type=ApplicationGroupPolicyType.THROTTLING_POLICY,
463
rate_limit_threshold=1000000, # 1MB/sec
464
metric_id=MetricId.INCOMING_BYTES
465
),
466
ThrottlingPolicy(
467
name="IngressMessagesThrottle",
468
type=ApplicationGroupPolicyType.THROTTLING_POLICY,
469
rate_limit_threshold=1000, # 1000 messages/sec
470
metric_id=MetricId.INCOMING_MESSAGES
471
)
472
]
473
474
app_group_params = ApplicationGroup(
475
is_enabled=True,
476
client_app_group_identifier="analytics-team",
477
policies=throttling_policies
478
)
479
480
app_group = client.application_group.create_or_update_application_group(
481
resource_group_name="my-resource-group",
482
namespace_name="my-eventhub-namespace",
483
application_group_name="analytics-app-group",
484
parameters=app_group_params
485
)
486
487
print(f"Created application group: {app_group.name}")
488
print(f"Client identifier: {app_group.client_app_group_identifier}")
489
print(f"Policies count: {len(app_group.policies)}")
490
491
# List all application groups
492
app_groups = client.application_group.list_by_namespace(
493
resource_group_name="my-resource-group",
494
namespace_name="my-eventhub-namespace"
495
)
496
497
for group in app_groups:
498
print(f"Application Group: {group.name}")
499
print(f" Enabled: {group.is_enabled}")
500
print(f" Client ID: {group.client_app_group_identifier}")
501
for policy in group.policies:
502
print(f" Policy: {policy.name} - {policy.rate_limit_threshold} {policy.metric_id}")
503
```
504
505
## Types
506
507
```python { .api }
508
class Cluster(TrackedResource):
509
def __init__(
510
self,
511
location: Optional[str] = None,
512
tags: Optional[Dict[str, str]] = None,
513
sku: Optional[ClusterSku] = None,
514
**kwargs: Any
515
): ...
516
517
sku: Optional[ClusterSku]
518
system_data: Optional[SystemData]
519
created_at: Optional[str]
520
updated_at: Optional[str]
521
metric_id: Optional[str]
522
status: Optional[str]
523
524
class ClusterSku:
525
def __init__(
526
self,
527
name: Union[str, ClusterSkuName],
528
capacity: Optional[int] = None,
529
**kwargs: Any
530
): ...
531
532
class ArmDisasterRecovery(ProxyResource):
533
def __init__(
534
self,
535
partner_namespace: Optional[str] = None,
536
alternate_name: Optional[str] = None,
537
**kwargs: Any
538
): ...
539
540
provisioning_state: Optional[Union[str, ProvisioningStateDR]]
541
role: Optional[Union[str, RoleDisasterRecovery]]
542
543
class SchemaGroup(ProxyResource):
544
def __init__(
545
self,
546
group_properties: Optional[Dict[str, str]] = None,
547
user_metadata: Optional[str] = None,
548
schema_compatibility: Optional[Union[str, SchemaCompatibility]] = None,
549
schema_type: Optional[Union[str, SchemaType]] = None,
550
**kwargs: Any
551
): ...
552
553
created_at_utc: Optional[datetime]
554
updated_at_utc: Optional[datetime]
555
e_tag: Optional[str]
556
557
class ApplicationGroup(ProxyResource):
558
def __init__(
559
self,
560
is_enabled: Optional[bool] = None,
561
client_app_group_identifier: Optional[str] = None,
562
policies: Optional[List[ApplicationGroupPolicy]] = None,
563
**kwargs: Any
564
): ...
565
566
class ThrottlingPolicy(ApplicationGroupPolicy):
567
def __init__(
568
self,
569
name: str,
570
type: Union[str, ApplicationGroupPolicyType],
571
rate_limit_threshold: int,
572
metric_id: Union[str, MetricId],
573
**kwargs: Any
574
): ...
575
576
class ClusterQuotaConfigurationProperties:
577
def __init__(
578
self,
579
settings: Optional[Dict[str, str]] = None,
580
**kwargs: Any
581
): ...
582
583
# Enums
584
class ClusterSkuName(str, Enum):
585
DEDICATED = "Dedicated"
586
587
class RoleDisasterRecovery(str, Enum):
588
PRIMARY = "Primary"
589
PRIMARY_NOT_REPLICATING = "PrimaryNotReplicating"
590
SECONDARY = "Secondary"
591
592
class ProvisioningStateDR(str, Enum):
593
ACCEPTED = "Accepted"
594
SUCCEEDED = "Succeeded"
595
FAILED = "Failed"
596
597
class SchemaCompatibility(str, Enum):
598
NONE = "None"
599
BACKWARD = "Backward"
600
FORWARD = "Forward"
601
602
class SchemaType(str, Enum):
603
UNKNOWN = "Unknown"
604
AVRO = "Avro"
605
606
class ApplicationGroupPolicyType(str, Enum):
607
THROTTLING_POLICY = "ThrottlingPolicy"
608
609
class MetricId(str, Enum):
610
INCOMING_BYTES = "IncomingBytes"
611
OUTGOING_BYTES = "OutgoingBytes"
612
INCOMING_MESSAGES = "IncomingMessages"
613
OUTGOING_MESSAGES = "OutgoingMessages"
614
```