Microsoft Azure Event Hub Management Client Library for Python
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Primary Event Hub resource management including namespaces, Event Hubs, and consumer groups. These operations form the foundation of Event Hub solutions, providing the essential functionality for creating, configuring, and managing event streaming infrastructure.
Namespaces are the primary containers for Event Hub resources, providing DNS names, capacity limits, and billing boundaries. They serve as the administrative and security boundary for Event Hub instances.
class NamespacesOperations:
def list(self, **kwargs) -> Iterable[EHNamespace]:
"""List all namespaces in the subscription."""
def list_by_resource_group(
self,
resource_group_name: str,
**kwargs
) -> Iterable[EHNamespace]:
"""List namespaces in a resource group."""
def begin_create_or_update(
self,
resource_group_name: str,
namespace_name: str,
parameters: Union[EHNamespace, IO[bytes]],
**kwargs
) -> LROPoller[EHNamespace]:
"""Create or update a namespace (long-running operation)."""
def begin_delete(
self,
resource_group_name: str,
namespace_name: str,
**kwargs
) -> LROPoller[None]:
"""Delete a namespace (long-running operation)."""
def get(
self,
resource_group_name: str,
namespace_name: str,
**kwargs
) -> EHNamespace:
"""Get namespace details."""
def update(
self,
resource_group_name: str,
namespace_name: str,
parameters: Union[EHNamespace, IO[bytes]],
**kwargs
) -> EHNamespace:
"""Update namespace properties."""
def check_name_availability(
self,
parameters: Union[CheckNameAvailabilityParameter, IO[bytes]],
**kwargs
) -> CheckNameAvailabilityResult:
"""Check if namespace name is available."""from azure.mgmt.eventhub.models import EHNamespace, Sku, Identity, ManagedServiceIdentityType
# Create namespace with system-assigned managed identity
namespace_params = EHNamespace(
location="East US",
sku=Sku(name="Standard", tier="Standard", capacity=1),
identity=Identity(type=ManagedServiceIdentityType.SYSTEM_ASSIGNED),
tags={"environment": "production", "team": "data-platform"}
)
# Start namespace creation (long-running operation)
namespace_operation = client.namespaces.begin_create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-eventhub-namespace",
parameters=namespace_params
)
# Wait for completion
namespace = namespace_operation.result()
print(f"Created namespace: {namespace.name} in {namespace.location}")
# Update namespace tags
namespace_update = EHNamespace(
tags={"environment": "production", "team": "data-platform", "cost-center": "engineering"}
)
updated_namespace = client.namespaces.update(
resource_group_name="my-resource-group",
namespace_name="my-eventhub-namespace",
parameters=namespace_update
)Event Hubs are the individual message streaming endpoints within namespaces, configured with partition count, retention policies, and capture settings for data archival.
class EventHubsOperations:
def list_by_namespace(
self,
resource_group_name: str,
namespace_name: str,
**kwargs
) -> Iterable[Eventhub]:
"""List Event Hubs in a namespace."""
def create_or_update(
self,
resource_group_name: str,
namespace_name: str,
event_hub_name: str,
parameters: Union[Eventhub, IO[bytes]],
**kwargs
) -> Eventhub:
"""Create or update an Event Hub."""
def delete(
self,
resource_group_name: str,
namespace_name: str,
event_hub_name: str,
**kwargs
) -> None:
"""Delete an Event Hub."""
def get(
self,
resource_group_name: str,
namespace_name: str,
event_hub_name: str,
**kwargs
) -> Eventhub:
"""Get Event Hub details."""from azure.mgmt.eventhub.models import Eventhub, CaptureDescription
# Create Event Hub with capture enabled
capture_config = CaptureDescription(
enabled=True,
encoding="Avro",
interval_in_seconds=300, # 5 minutes
size_limit_in_bytes=104857600, # 100MB
destination={
"name": "EventHubArchive.AzureBlockBlob",
"properties": {
"storageAccountResourceId": "/subscriptions/{sub}/resourceGroups/{rg}/providers/Microsoft.Storage/storageAccounts/{account}",
"blobContainer": "eventhub-capture",
"archiveNameFormat": "{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}"
}
}
)
eventhub_params = Eventhub(
message_retention_in_days=7,
partition_count=4,
status="Active",
capture_description=capture_config
)
eventhub = client.event_hubs.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-eventhub-namespace",
event_hub_name="telemetry-events",
parameters=eventhub_params
)
print(f"Created Event Hub: {eventhub.name} with {eventhub.partition_count} partitions")Consumer groups enable multiple consuming applications to have separate views of the event stream, each maintaining their own offset position and processing independently.
class ConsumerGroupsOperations:
def create_or_update(
self,
resource_group_name: str,
namespace_name: str,
event_hub_name: str,
consumer_group_name: str,
parameters: Union[ConsumerGroup, IO[bytes]],
**kwargs
) -> ConsumerGroup:
"""Create or update a consumer group."""
def delete(
self,
resource_group_name: str,
namespace_name: str,
event_hub_name: str,
consumer_group_name: str,
**kwargs
) -> None:
"""Delete a consumer group."""
def get(
self,
resource_group_name: str,
namespace_name: str,
event_hub_name: str,
consumer_group_name: str,
**kwargs
) -> ConsumerGroup:
"""Get consumer group details."""
def list_by_event_hub(
self,
resource_group_name: str,
namespace_name: str,
event_hub_name: str,
**kwargs
) -> Iterable[ConsumerGroup]:
"""List consumer groups for an Event Hub."""from azure.mgmt.eventhub.models import ConsumerGroup
# Create consumer groups for different applications
consumer_groups = [
"analytics-processor",
"real-time-dashboard",
"audit-logger"
]
for cg_name in consumer_groups:
consumer_group_params = ConsumerGroup(
user_metadata=f"Consumer group for {cg_name} application"
)
consumer_group = client.consumer_groups.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-eventhub-namespace",
event_hub_name="telemetry-events",
consumer_group_name=cg_name,
parameters=consumer_group_params
)
print(f"Created consumer group: {consumer_group.name}")
# List all consumer groups
consumer_groups_list = client.consumer_groups.list_by_event_hub(
resource_group_name="my-resource-group",
namespace_name="my-eventhub-namespace",
event_hub_name="telemetry-events"
)
for cg in consumer_groups_list:
print(f"Consumer Group: {cg.name} - {cg.user_metadata}")class EHNamespace(TrackedResource):
def __init__(
self,
location: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
sku: Optional[Sku] = None,
identity: Optional[Identity] = None,
minimum_tls_version: Optional[Union[str, TlsVersion]] = None,
public_network_access: Optional[Union[str, PublicNetworkAccess]] = None,
disable_local_auth: Optional[bool] = None,
zone_redundant: Optional[bool] = None,
is_auto_inflate_enabled: Optional[bool] = None,
maximum_throughput_units: Optional[int] = None,
kafka_enabled: Optional[bool] = None,
encryption: Optional[Encryption] = None,
**kwargs: Any
): ...
sku: Optional[Sku]
identity: Optional[Identity]
provisioning_state: Optional[str]
status: Optional[str]
created_at: Optional[datetime]
updated_at: Optional[datetime]
service_bus_endpoint: Optional[str]
cluster_arm_id: Optional[str]
class Eventhub(ProxyResource):
def __init__(
self,
message_retention_in_days: Optional[int] = None,
partition_count: Optional[int] = None,
status: Optional[Union[str, EntityStatus]] = None,
capture_description: Optional[CaptureDescription] = None,
**kwargs: Any
): ...
partition_ids: Optional[List[str]]
created_at: Optional[datetime]
updated_at: Optional[datetime]
class ConsumerGroup(ProxyResource):
def __init__(
self,
user_metadata: Optional[str] = None,
**kwargs: Any
): ...
created_at: Optional[datetime]
updated_at: Optional[datetime]
class Sku:
def __init__(
self,
name: Union[str, SkuName],
tier: Optional[Union[str, SkuTier]] = None,
capacity: Optional[int] = None,
**kwargs: Any
): ...
class CheckNameAvailabilityParameter:
def __init__(
self,
name: str,
**kwargs: Any
): ...
class CheckNameAvailabilityResult:
def __init__(
self,
message: Optional[str] = None,
name_available: Optional[bool] = None,
reason: Optional[Union[str, UnavailableReason]] = None,
**kwargs: Any
): ...Install with Tessl CLI
npx tessl i tessl/pypi-azure-mgmt-eventhub