Microsoft Azure Event Hub Management Client Library for Python
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Advanced enterprise capabilities for Azure Event Hub including dedicated clusters, disaster recovery, schema registry, and application groups. These features provide production-scale reliability, compliance, and operational capabilities for mission-critical event streaming scenarios.
Dedicated clusters provide isolated, single-tenant Event Hub infrastructure with guaranteed capacity, predictable performance, and enhanced security isolation for enterprise workloads.
class ClustersOperations:
def list_available_cluster_region(
self,
**kwargs
) -> AvailableClustersList:
"""List available regions for cluster deployment."""
def list_by_subscription(
self,
**kwargs
) -> Iterable[Cluster]:
"""List clusters in subscription."""
def list_by_resource_group(
self,
resource_group_name: str,
**kwargs
) -> Iterable[Cluster]:
"""List clusters in resource group."""
def get(
self,
resource_group_name: str,
cluster_name: str,
**kwargs
) -> Cluster:
"""Get cluster details."""
def begin_create_or_update(
self,
resource_group_name: str,
cluster_name: str,
parameters: Union[Cluster, IO[bytes]],
**kwargs
) -> LROPoller[Cluster]:
"""Create or update cluster (long-running operation)."""
def begin_update(
self,
resource_group_name: str,
cluster_name: str,
parameters: Union[Cluster, IO[bytes]],
**kwargs
) -> LROPoller[Cluster]:
"""Update cluster (long-running operation)."""
def begin_delete(
self,
resource_group_name: str,
cluster_name: str,
**kwargs
) -> LROPoller[None]:
"""Delete cluster (long-running operation)."""
def list_namespaces(
self,
resource_group_name: str,
cluster_name: str,
**kwargs
) -> Iterable[EHNamespace]:
"""List namespaces in cluster."""
class ConfigurationOperations:
def patch(
self,
resource_group_name: str,
cluster_name: str,
parameters: Union[ClusterQuotaConfigurationProperties, IO[bytes]],
**kwargs
) -> ClusterQuotaConfigurationProperties:
"""Update cluster configuration."""
def get(
self,
resource_group_name: str,
cluster_name: str,
**kwargs
) -> ClusterQuotaConfigurationProperties:
"""Get cluster configuration."""from azure.mgmt.eventhub.models import Cluster, Sku as ClusterSku, ClusterSkuName
# Check available regions for cluster deployment
available_regions = client.clusters.list_available_cluster_region()
for region in available_regions.value:
print(f"Available region: {region.code}")
# Create dedicated cluster
cluster_params = Cluster(
location="East US",
sku=ClusterSku(name=ClusterSkuName.DEDICATED, capacity=1),
tags={"environment": "production", "tier": "dedicated"}
)
cluster_operation = client.clusters.begin_create_or_update(
resource_group_name="my-resource-group",
cluster_name="my-eventhub-cluster",
parameters=cluster_params
)
# Wait for cluster creation (can take 1+ hours)
cluster = cluster_operation.result()
print(f"Created cluster: {cluster.name} with capacity: {cluster.sku.capacity}")
# Configure cluster quota settings
from azure.mgmt.eventhub.models import ClusterQuotaConfigurationProperties
quota_config = ClusterQuotaConfigurationProperties(
settings={
"maxIngressUnitsPerCluster": "20",
"maxEgressUnitsPerCluster": "20"
}
)
updated_config = client.configuration.patch(
resource_group_name="my-resource-group",
cluster_name="my-eventhub-cluster",
parameters=quota_config
)
# Associate namespace with cluster
namespace_params = EHNamespace(
location="East US",
sku=Sku(name="Premium", tier="Premium"),
cluster_arm_id=cluster.id
)
namespace_operation = client.namespaces.begin_create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-cluster-namespace",
parameters=namespace_params
)
namespace = namespace_operation.result()
print(f"Associated namespace {namespace.name} with cluster")Disaster recovery provides automated geo-replication and failover capabilities to ensure business continuity and data availability across Azure regions.
class DisasterRecoveryConfigsOperations:
def check_name_availability(
self,
resource_group_name: str,
namespace_name: str,
parameters: Union[CheckNameAvailabilityParameter, IO[bytes]],
**kwargs
) -> CheckNameAvailabilityResult:
"""Check disaster recovery alias name availability."""
def list(
self,
resource_group_name: str,
namespace_name: str,
**kwargs
) -> Iterable[ArmDisasterRecovery]:
"""List disaster recovery configurations."""
def create_or_update(
self,
resource_group_name: str,
namespace_name: str,
alias: str,
parameters: Union[ArmDisasterRecovery, IO[bytes]],
**kwargs
) -> ArmDisasterRecovery:
"""Create or update disaster recovery configuration."""
def delete(
self,
resource_group_name: str,
namespace_name: str,
alias: str,
**kwargs
) -> None:
"""Delete disaster recovery configuration."""
def get(
self,
resource_group_name: str,
namespace_name: str,
alias: str,
**kwargs
) -> ArmDisasterRecovery:
"""Get disaster recovery configuration."""
def break_pairing(
self,
resource_group_name: str,
namespace_name: str,
alias: str,
**kwargs
) -> None:
"""Break disaster recovery pairing."""
def fail_over(
self,
resource_group_name: str,
namespace_name: str,
alias: str,
**kwargs
) -> None:
"""Initiate disaster recovery failover."""
def list_authorization_rules(
self,
resource_group_name: str,
namespace_name: str,
alias: str,
**kwargs
) -> Iterable[AuthorizationRule]:
"""List authorization rules for disaster recovery alias."""
def get_authorization_rule(
self,
resource_group_name: str,
namespace_name: str,
alias: str,
authorization_rule_name: str,
**kwargs
) -> AuthorizationRule:
"""Get authorization rule for disaster recovery alias."""
def list_keys(
self,
resource_group_name: str,
namespace_name: str,
alias: str,
authorization_rule_name: str,
**kwargs
) -> AccessKeys:
"""Get authorization rule keys for disaster recovery alias."""from azure.mgmt.eventhub.models import ArmDisasterRecovery
# Create disaster recovery configuration
dr_config = ArmDisasterRecovery(
partner_namespace="/subscriptions/{partner-sub}/resourceGroups/{partner-rg}/providers/Microsoft.EventHub/namespaces/{partner-namespace}",
alternate_name="my-dr-alias"
)
dr_pairing = client.disaster_recovery_configs.create_or_update(
resource_group_name="my-primary-rg",
namespace_name="my-primary-namespace",
alias="my-dr-alias",
parameters=dr_config
)
print(f"Created DR configuration: {dr_pairing.name}")
print(f"Partner namespace: {dr_pairing.partner_namespace}")
print(f"Role: {dr_pairing.role}")
print(f"Provisioning state: {dr_pairing.provisioning_state}")
# Monitor replication status
dr_status = client.disaster_recovery_configs.get(
resource_group_name="my-primary-rg",
namespace_name="my-primary-namespace",
alias="my-dr-alias"
)
if dr_status.provisioning_state == "Succeeded":
print("Disaster recovery pairing is active")
# Get connection strings for the DR alias
dr_auth_rules = client.disaster_recovery_configs.list_authorization_rules(
resource_group_name="my-primary-rg",
namespace_name="my-primary-namespace",
alias="my-dr-alias"
)
for rule in dr_auth_rules:
dr_keys = client.disaster_recovery_configs.list_keys(
resource_group_name="my-primary-rg",
namespace_name="my-primary-namespace",
alias="my-dr-alias",
authorization_rule_name=rule.name
)
print(f"DR Connection String: {dr_keys.primary_connection_string}")
# Simulate disaster scenario - initiate failover
client.disaster_recovery_configs.fail_over(
resource_group_name="my-primary-rg",
namespace_name="my-primary-namespace",
alias="my-dr-alias"
)
print("Failover initiated - secondary namespace is now primary")Schema registry provides centralized schema management for structured data, enabling schema evolution, compatibility checking, and data governance for Event Hub streams.
class SchemaRegistryOperations:
def list_by_namespace(
self,
resource_group_name: str,
namespace_name: str,
**kwargs
) -> Iterable[SchemaGroup]:
"""List schema groups in namespace."""
def create_or_update(
self,
resource_group_name: str,
namespace_name: str,
schema_group_name: str,
parameters: Union[SchemaGroup, IO[bytes]],
**kwargs
) -> SchemaGroup:
"""Create or update schema group."""
def delete(
self,
resource_group_name: str,
namespace_name: str,
schema_group_name: str,
**kwargs
) -> None:
"""Delete schema group."""
def get(
self,
resource_group_name: str,
namespace_name: str,
schema_group_name: str,
**kwargs
) -> SchemaGroup:
"""Get schema group."""from azure.mgmt.eventhub.models import SchemaGroup, SchemaCompatibility, SchemaType
# Create schema groups for different data formats
telemetry_schema_group = SchemaGroup(
group_properties={
"Namespace": "my-eventhub-namespace",
"Name": "telemetry-schemas"
},
schema_compatibility=SchemaCompatibility.BACKWARD,
schema_type=SchemaType.AVRO,
user_metadata="Schema group for telemetry data formats"
)
telemetry_group = client.schema_registry.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-eventhub-namespace",
schema_group_name="telemetry-schemas",
parameters=telemetry_schema_group
)
# Create schema group for audit events
audit_schema_group = SchemaGroup(
group_properties={
"Namespace": "my-eventhub-namespace",
"Name": "audit-schemas"
},
schema_compatibility=SchemaCompatibility.FORWARD,
schema_type=SchemaType.AVRO,
user_metadata="Schema group for audit event formats"
)
audit_group = client.schema_registry.create_or_update(
resource_group_name="my-resource-group",
namespace_name="my-eventhub-namespace",
schema_group_name="audit-schemas",
parameters=audit_schema_group
)
# List all schema groups
schema_groups = client.schema_registry.list_by_namespace(
resource_group_name="my-resource-group",
namespace_name="my-eventhub-namespace"
)
for group in schema_groups:
print(f"Schema Group: {group.name}")
print(f" Type: {group.schema_type}")
print(f" Compatibility: {group.schema_compatibility}")
print(f" Description: {group.user_metadata}")Application groups provide resource quotas and throttling policies to ensure fair resource usage and prevent resource exhaustion in multi-tenant scenarios.
class ApplicationGroupOperations:
def list_by_namespace(
self,
resource_group_name: str,
namespace_name: str,
**kwargs
) -> Iterable[ApplicationGroup]:
"""List application groups in namespace."""
def create_or_update_application_group(
self,
resource_group_name: str,
namespace_name: str,
application_group_name: str,
parameters: Union[ApplicationGroup, IO[bytes]],
**kwargs
) -> ApplicationGroup:
"""Create or update application group."""
def delete(
self,
resource_group_name: str,
namespace_name: str,
application_group_name: str,
**kwargs
) -> None:
"""Delete application group."""
def get(
self,
resource_group_name: str,
namespace_name: str,
application_group_name: str,
**kwargs
) -> ApplicationGroup:
"""Get application group."""from azure.mgmt.eventhub.models import (
ApplicationGroup, ThrottlingPolicy,
ApplicationGroupPolicyType, MetricId
)
# Create application group with throttling policies
throttling_policies = [
ThrottlingPolicy(
name="IngressBytesThrottle",
type=ApplicationGroupPolicyType.THROTTLING_POLICY,
rate_limit_threshold=1000000, # 1MB/sec
metric_id=MetricId.INCOMING_BYTES
),
ThrottlingPolicy(
name="IngressMessagesThrottle",
type=ApplicationGroupPolicyType.THROTTLING_POLICY,
rate_limit_threshold=1000, # 1000 messages/sec
metric_id=MetricId.INCOMING_MESSAGES
)
]
app_group_params = ApplicationGroup(
is_enabled=True,
client_app_group_identifier="analytics-team",
policies=throttling_policies
)
app_group = client.application_group.create_or_update_application_group(
resource_group_name="my-resource-group",
namespace_name="my-eventhub-namespace",
application_group_name="analytics-app-group",
parameters=app_group_params
)
print(f"Created application group: {app_group.name}")
print(f"Client identifier: {app_group.client_app_group_identifier}")
print(f"Policies count: {len(app_group.policies)}")
# List all application groups
app_groups = client.application_group.list_by_namespace(
resource_group_name="my-resource-group",
namespace_name="my-eventhub-namespace"
)
for group in app_groups:
print(f"Application Group: {group.name}")
print(f" Enabled: {group.is_enabled}")
print(f" Client ID: {group.client_app_group_identifier}")
for policy in group.policies:
print(f" Policy: {policy.name} - {policy.rate_limit_threshold} {policy.metric_id}")class Cluster(TrackedResource):
def __init__(
self,
location: Optional[str] = None,
tags: Optional[Dict[str, str]] = None,
sku: Optional[ClusterSku] = None,
**kwargs: Any
): ...
sku: Optional[ClusterSku]
system_data: Optional[SystemData]
created_at: Optional[str]
updated_at: Optional[str]
metric_id: Optional[str]
status: Optional[str]
class ClusterSku:
def __init__(
self,
name: Union[str, ClusterSkuName],
capacity: Optional[int] = None,
**kwargs: Any
): ...
class ArmDisasterRecovery(ProxyResource):
def __init__(
self,
partner_namespace: Optional[str] = None,
alternate_name: Optional[str] = None,
**kwargs: Any
): ...
provisioning_state: Optional[Union[str, ProvisioningStateDR]]
role: Optional[Union[str, RoleDisasterRecovery]]
class SchemaGroup(ProxyResource):
def __init__(
self,
group_properties: Optional[Dict[str, str]] = None,
user_metadata: Optional[str] = None,
schema_compatibility: Optional[Union[str, SchemaCompatibility]] = None,
schema_type: Optional[Union[str, SchemaType]] = None,
**kwargs: Any
): ...
created_at_utc: Optional[datetime]
updated_at_utc: Optional[datetime]
e_tag: Optional[str]
class ApplicationGroup(ProxyResource):
def __init__(
self,
is_enabled: Optional[bool] = None,
client_app_group_identifier: Optional[str] = None,
policies: Optional[List[ApplicationGroupPolicy]] = None,
**kwargs: Any
): ...
class ThrottlingPolicy(ApplicationGroupPolicy):
def __init__(
self,
name: str,
type: Union[str, ApplicationGroupPolicyType],
rate_limit_threshold: int,
metric_id: Union[str, MetricId],
**kwargs: Any
): ...
class ClusterQuotaConfigurationProperties:
def __init__(
self,
settings: Optional[Dict[str, str]] = None,
**kwargs: Any
): ...
# Enums
class ClusterSkuName(str, Enum):
DEDICATED = "Dedicated"
class RoleDisasterRecovery(str, Enum):
PRIMARY = "Primary"
PRIMARY_NOT_REPLICATING = "PrimaryNotReplicating"
SECONDARY = "Secondary"
class ProvisioningStateDR(str, Enum):
ACCEPTED = "Accepted"
SUCCEEDED = "Succeeded"
FAILED = "Failed"
class SchemaCompatibility(str, Enum):
NONE = "None"
BACKWARD = "Backward"
FORWARD = "Forward"
class SchemaType(str, Enum):
UNKNOWN = "Unknown"
AVRO = "Avro"
class ApplicationGroupPolicyType(str, Enum):
THROTTLING_POLICY = "ThrottlingPolicy"
class MetricId(str, Enum):
INCOMING_BYTES = "IncomingBytes"
OUTGOING_BYTES = "OutgoingBytes"
INCOMING_MESSAGES = "IncomingMessages"
OUTGOING_MESSAGES = "OutgoingMessages"Install with Tessl CLI
npx tessl i tessl/pypi-azure-mgmt-eventhub