0
# Core Management
1
2
Primary Event Hub resource management including namespaces, Event Hubs, and consumer groups. These operations form the foundation of Event Hub solutions, providing the essential functionality for creating, configuring, and managing event streaming infrastructure.
3
4
## Capabilities
5
6
### Namespace Management
7
8
Namespaces are the primary containers for Event Hub resources, providing DNS names, capacity limits, and billing boundaries. They serve as the administrative and security boundary for Event Hub instances.
9
10
```python { .api }
11
class NamespacesOperations:
12
def list(self, **kwargs) -> Iterable[EHNamespace]:
13
"""List all namespaces in the subscription."""
14
15
def list_by_resource_group(
16
self,
17
resource_group_name: str,
18
**kwargs
19
) -> Iterable[EHNamespace]:
20
"""List namespaces in a resource group."""
21
22
def begin_create_or_update(
23
self,
24
resource_group_name: str,
25
namespace_name: str,
26
parameters: Union[EHNamespace, IO[bytes]],
27
**kwargs
28
) -> LROPoller[EHNamespace]:
29
"""Create or update a namespace (long-running operation)."""
30
31
def begin_delete(
32
self,
33
resource_group_name: str,
34
namespace_name: str,
35
**kwargs
36
) -> LROPoller[None]:
37
"""Delete a namespace (long-running operation)."""
38
39
def get(
40
self,
41
resource_group_name: str,
42
namespace_name: str,
43
**kwargs
44
) -> EHNamespace:
45
"""Get namespace details."""
46
47
def update(
48
self,
49
resource_group_name: str,
50
namespace_name: str,
51
parameters: Union[EHNamespace, IO[bytes]],
52
**kwargs
53
) -> EHNamespace:
54
"""Update namespace properties."""
55
56
def check_name_availability(
57
self,
58
parameters: Union[CheckNameAvailabilityParameter, IO[bytes]],
59
**kwargs
60
) -> CheckNameAvailabilityResult:
61
"""Check if namespace name is available."""
62
```
63
64
#### Usage Example
65
66
```python
67
from azure.mgmt.eventhub.models import EHNamespace, Sku, Identity, ManagedServiceIdentityType
68
69
# Create namespace with system-assigned managed identity
70
namespace_params = EHNamespace(
71
location="East US",
72
sku=Sku(name="Standard", tier="Standard", capacity=1),
73
identity=Identity(type=ManagedServiceIdentityType.SYSTEM_ASSIGNED),
74
tags={"environment": "production", "team": "data-platform"}
75
)
76
77
# Start namespace creation (long-running operation)
78
namespace_operation = client.namespaces.begin_create_or_update(
79
resource_group_name="my-resource-group",
80
namespace_name="my-eventhub-namespace",
81
parameters=namespace_params
82
)
83
84
# Wait for completion
85
namespace = namespace_operation.result()
86
print(f"Created namespace: {namespace.name} in {namespace.location}")
87
88
# Update namespace tags
89
namespace_update = EHNamespace(
90
tags={"environment": "production", "team": "data-platform", "cost-center": "engineering"}
91
)
92
93
updated_namespace = client.namespaces.update(
94
resource_group_name="my-resource-group",
95
namespace_name="my-eventhub-namespace",
96
parameters=namespace_update
97
)
98
```
99
100
### Event Hub Management
101
102
Event Hubs are the individual message streaming endpoints within namespaces, configured with partition count, retention policies, and capture settings for data archival.
103
104
```python { .api }
105
class EventHubsOperations:
106
def list_by_namespace(
107
self,
108
resource_group_name: str,
109
namespace_name: str,
110
**kwargs
111
) -> Iterable[Eventhub]:
112
"""List Event Hubs in a namespace."""
113
114
def create_or_update(
115
self,
116
resource_group_name: str,
117
namespace_name: str,
118
event_hub_name: str,
119
parameters: Union[Eventhub, IO[bytes]],
120
**kwargs
121
) -> Eventhub:
122
"""Create or update an Event Hub."""
123
124
def delete(
125
self,
126
resource_group_name: str,
127
namespace_name: str,
128
event_hub_name: str,
129
**kwargs
130
) -> None:
131
"""Delete an Event Hub."""
132
133
def get(
134
self,
135
resource_group_name: str,
136
namespace_name: str,
137
event_hub_name: str,
138
**kwargs
139
) -> Eventhub:
140
"""Get Event Hub details."""
141
```
142
143
#### Usage Example
144
145
```python
146
from azure.mgmt.eventhub.models import Eventhub, CaptureDescription
147
148
# Create Event Hub with capture enabled
149
capture_config = CaptureDescription(
150
enabled=True,
151
encoding="Avro",
152
interval_in_seconds=300, # 5 minutes
153
size_limit_in_bytes=104857600, # 100MB
154
destination={
155
"name": "EventHubArchive.AzureBlockBlob",
156
"properties": {
157
"storageAccountResourceId": "/subscriptions/{sub}/resourceGroups/{rg}/providers/Microsoft.Storage/storageAccounts/{account}",
158
"blobContainer": "eventhub-capture",
159
"archiveNameFormat": "{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}"
160
}
161
}
162
)
163
164
eventhub_params = Eventhub(
165
message_retention_in_days=7,
166
partition_count=4,
167
status="Active",
168
capture_description=capture_config
169
)
170
171
eventhub = client.event_hubs.create_or_update(
172
resource_group_name="my-resource-group",
173
namespace_name="my-eventhub-namespace",
174
event_hub_name="telemetry-events",
175
parameters=eventhub_params
176
)
177
178
print(f"Created Event Hub: {eventhub.name} with {eventhub.partition_count} partitions")
179
```
180
181
### Consumer Group Management
182
183
Consumer groups enable multiple consuming applications to have separate views of the event stream, each maintaining their own offset position and processing independently.
184
185
```python { .api }
186
class ConsumerGroupsOperations:
187
def create_or_update(
188
self,
189
resource_group_name: str,
190
namespace_name: str,
191
event_hub_name: str,
192
consumer_group_name: str,
193
parameters: Union[ConsumerGroup, IO[bytes]],
194
**kwargs
195
) -> ConsumerGroup:
196
"""Create or update a consumer group."""
197
198
def delete(
199
self,
200
resource_group_name: str,
201
namespace_name: str,
202
event_hub_name: str,
203
consumer_group_name: str,
204
**kwargs
205
) -> None:
206
"""Delete a consumer group."""
207
208
def get(
209
self,
210
resource_group_name: str,
211
namespace_name: str,
212
event_hub_name: str,
213
consumer_group_name: str,
214
**kwargs
215
) -> ConsumerGroup:
216
"""Get consumer group details."""
217
218
def list_by_event_hub(
219
self,
220
resource_group_name: str,
221
namespace_name: str,
222
event_hub_name: str,
223
**kwargs
224
) -> Iterable[ConsumerGroup]:
225
"""List consumer groups for an Event Hub."""
226
```
227
228
#### Usage Example
229
230
```python
231
from azure.mgmt.eventhub.models import ConsumerGroup
232
233
# Create consumer groups for different applications
234
consumer_groups = [
235
"analytics-processor",
236
"real-time-dashboard",
237
"audit-logger"
238
]
239
240
for cg_name in consumer_groups:
241
consumer_group_params = ConsumerGroup(
242
user_metadata=f"Consumer group for {cg_name} application"
243
)
244
245
consumer_group = client.consumer_groups.create_or_update(
246
resource_group_name="my-resource-group",
247
namespace_name="my-eventhub-namespace",
248
event_hub_name="telemetry-events",
249
consumer_group_name=cg_name,
250
parameters=consumer_group_params
251
)
252
253
print(f"Created consumer group: {consumer_group.name}")
254
255
# List all consumer groups
256
consumer_groups_list = client.consumer_groups.list_by_event_hub(
257
resource_group_name="my-resource-group",
258
namespace_name="my-eventhub-namespace",
259
event_hub_name="telemetry-events"
260
)
261
262
for cg in consumer_groups_list:
263
print(f"Consumer Group: {cg.name} - {cg.user_metadata}")
264
```
265
266
## Types
267
268
```python { .api }
269
class EHNamespace(TrackedResource):
270
def __init__(
271
self,
272
location: Optional[str] = None,
273
tags: Optional[Dict[str, str]] = None,
274
sku: Optional[Sku] = None,
275
identity: Optional[Identity] = None,
276
minimum_tls_version: Optional[Union[str, TlsVersion]] = None,
277
public_network_access: Optional[Union[str, PublicNetworkAccess]] = None,
278
disable_local_auth: Optional[bool] = None,
279
zone_redundant: Optional[bool] = None,
280
is_auto_inflate_enabled: Optional[bool] = None,
281
maximum_throughput_units: Optional[int] = None,
282
kafka_enabled: Optional[bool] = None,
283
encryption: Optional[Encryption] = None,
284
**kwargs: Any
285
): ...
286
287
sku: Optional[Sku]
288
identity: Optional[Identity]
289
provisioning_state: Optional[str]
290
status: Optional[str]
291
created_at: Optional[datetime]
292
updated_at: Optional[datetime]
293
service_bus_endpoint: Optional[str]
294
cluster_arm_id: Optional[str]
295
296
class Eventhub(ProxyResource):
297
def __init__(
298
self,
299
message_retention_in_days: Optional[int] = None,
300
partition_count: Optional[int] = None,
301
status: Optional[Union[str, EntityStatus]] = None,
302
capture_description: Optional[CaptureDescription] = None,
303
**kwargs: Any
304
): ...
305
306
partition_ids: Optional[List[str]]
307
created_at: Optional[datetime]
308
updated_at: Optional[datetime]
309
310
class ConsumerGroup(ProxyResource):
311
def __init__(
312
self,
313
user_metadata: Optional[str] = None,
314
**kwargs: Any
315
): ...
316
317
created_at: Optional[datetime]
318
updated_at: Optional[datetime]
319
320
class Sku:
321
def __init__(
322
self,
323
name: Union[str, SkuName],
324
tier: Optional[Union[str, SkuTier]] = None,
325
capacity: Optional[int] = None,
326
**kwargs: Any
327
): ...
328
329
class CheckNameAvailabilityParameter:
330
def __init__(
331
self,
332
name: str,
333
**kwargs: Any
334
): ...
335
336
class CheckNameAvailabilityResult:
337
def __init__(
338
self,
339
message: Optional[str] = None,
340
name_available: Optional[bool] = None,
341
reason: Optional[Union[str, UnavailableReason]] = None,
342
**kwargs: Any
343
): ...
344
```