CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-mgmt-eventhub

Microsoft Azure Event Hub Management Client Library for Python

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

core-management.mddocs/

Core Management

Primary Event Hub resource management including namespaces, Event Hubs, and consumer groups. These operations form the foundation of Event Hub solutions, providing the essential functionality for creating, configuring, and managing event streaming infrastructure.

Capabilities

Namespace Management

Namespaces are the primary containers for Event Hub resources, providing DNS names, capacity limits, and billing boundaries. They serve as the administrative and security boundary for Event Hub instances.

class NamespacesOperations:
    def list(self, **kwargs) -> Iterable[EHNamespace]:
        """List all namespaces in the subscription."""
    
    def list_by_resource_group(
        self,
        resource_group_name: str,
        **kwargs
    ) -> Iterable[EHNamespace]:
        """List namespaces in a resource group."""
    
    def begin_create_or_update(
        self,
        resource_group_name: str,
        namespace_name: str,
        parameters: Union[EHNamespace, IO[bytes]],
        **kwargs
    ) -> LROPoller[EHNamespace]:
        """Create or update a namespace (long-running operation)."""
    
    def begin_delete(
        self,
        resource_group_name: str,
        namespace_name: str,
        **kwargs
    ) -> LROPoller[None]:
        """Delete a namespace (long-running operation)."""
    
    def get(
        self,
        resource_group_name: str,
        namespace_name: str,
        **kwargs
    ) -> EHNamespace:
        """Get namespace details."""
    
    def update(
        self,
        resource_group_name: str,
        namespace_name: str,
        parameters: Union[EHNamespace, IO[bytes]],
        **kwargs
    ) -> EHNamespace:
        """Update namespace properties."""
    
    def check_name_availability(
        self,
        parameters: Union[CheckNameAvailabilityParameter, IO[bytes]],
        **kwargs
    ) -> CheckNameAvailabilityResult:
        """Check if namespace name is available."""

Usage Example

from azure.mgmt.eventhub.models import EHNamespace, Sku, Identity, ManagedServiceIdentityType

# Create namespace with system-assigned managed identity
namespace_params = EHNamespace(
    location="East US",
    sku=Sku(name="Standard", tier="Standard", capacity=1),
    identity=Identity(type=ManagedServiceIdentityType.SYSTEM_ASSIGNED),
    tags={"environment": "production", "team": "data-platform"}
)

# Start namespace creation (long-running operation)
namespace_operation = client.namespaces.begin_create_or_update(
    resource_group_name="my-resource-group",
    namespace_name="my-eventhub-namespace",
    parameters=namespace_params
)

# Wait for completion
namespace = namespace_operation.result()
print(f"Created namespace: {namespace.name} in {namespace.location}")

# Update namespace tags
namespace_update = EHNamespace(
    tags={"environment": "production", "team": "data-platform", "cost-center": "engineering"}
)

updated_namespace = client.namespaces.update(
    resource_group_name="my-resource-group",
    namespace_name="my-eventhub-namespace",
    parameters=namespace_update
)

Event Hub Management

Event Hubs are the individual message streaming endpoints within namespaces, configured with partition count, retention policies, and capture settings for data archival.

class EventHubsOperations:
    def list_by_namespace(
        self,
        resource_group_name: str,
        namespace_name: str,
        **kwargs
    ) -> Iterable[Eventhub]:
        """List Event Hubs in a namespace."""
    
    def create_or_update(
        self,
        resource_group_name: str,
        namespace_name: str,
        event_hub_name: str,
        parameters: Union[Eventhub, IO[bytes]],
        **kwargs
    ) -> Eventhub:
        """Create or update an Event Hub."""
    
    def delete(
        self,
        resource_group_name: str,
        namespace_name: str,
        event_hub_name: str,
        **kwargs
    ) -> None:
        """Delete an Event Hub."""
    
    def get(
        self,
        resource_group_name: str,
        namespace_name: str,
        event_hub_name: str,
        **kwargs
    ) -> Eventhub:
        """Get Event Hub details."""

Usage Example

from azure.mgmt.eventhub.models import Eventhub, CaptureDescription

# Create Event Hub with capture enabled
capture_config = CaptureDescription(
    enabled=True,
    encoding="Avro",
    interval_in_seconds=300,  # 5 minutes
    size_limit_in_bytes=104857600,  # 100MB
    destination={
        "name": "EventHubArchive.AzureBlockBlob",
        "properties": {
            "storageAccountResourceId": "/subscriptions/{sub}/resourceGroups/{rg}/providers/Microsoft.Storage/storageAccounts/{account}",
            "blobContainer": "eventhub-capture",
            "archiveNameFormat": "{Namespace}/{EventHub}/{PartitionId}/{Year}/{Month}/{Day}/{Hour}/{Minute}/{Second}"
        }
    }
)

eventhub_params = Eventhub(
    message_retention_in_days=7,
    partition_count=4,
    status="Active",
    capture_description=capture_config
)

eventhub = client.event_hubs.create_or_update(
    resource_group_name="my-resource-group",
    namespace_name="my-eventhub-namespace",
    event_hub_name="telemetry-events",
    parameters=eventhub_params
)

print(f"Created Event Hub: {eventhub.name} with {eventhub.partition_count} partitions")

Consumer Group Management

Consumer groups enable multiple consuming applications to have separate views of the event stream, each maintaining their own offset position and processing independently.

class ConsumerGroupsOperations:
    def create_or_update(
        self,
        resource_group_name: str,
        namespace_name: str,
        event_hub_name: str,
        consumer_group_name: str,
        parameters: Union[ConsumerGroup, IO[bytes]],
        **kwargs
    ) -> ConsumerGroup:
        """Create or update a consumer group."""
    
    def delete(
        self,
        resource_group_name: str,
        namespace_name: str,
        event_hub_name: str,
        consumer_group_name: str,
        **kwargs
    ) -> None:
        """Delete a consumer group."""
    
    def get(
        self,
        resource_group_name: str,
        namespace_name: str,
        event_hub_name: str,
        consumer_group_name: str,
        **kwargs
    ) -> ConsumerGroup:
        """Get consumer group details."""
    
    def list_by_event_hub(
        self,
        resource_group_name: str,
        namespace_name: str,
        event_hub_name: str,
        **kwargs
    ) -> Iterable[ConsumerGroup]:
        """List consumer groups for an Event Hub."""

Usage Example

from azure.mgmt.eventhub.models import ConsumerGroup

# Create consumer groups for different applications
consumer_groups = [
    "analytics-processor",
    "real-time-dashboard", 
    "audit-logger"
]

for cg_name in consumer_groups:
    consumer_group_params = ConsumerGroup(
        user_metadata=f"Consumer group for {cg_name} application"
    )
    
    consumer_group = client.consumer_groups.create_or_update(
        resource_group_name="my-resource-group",
        namespace_name="my-eventhub-namespace",
        event_hub_name="telemetry-events",
        consumer_group_name=cg_name,
        parameters=consumer_group_params
    )
    
    print(f"Created consumer group: {consumer_group.name}")

# List all consumer groups
consumer_groups_list = client.consumer_groups.list_by_event_hub(
    resource_group_name="my-resource-group",
    namespace_name="my-eventhub-namespace",
    event_hub_name="telemetry-events"
)

for cg in consumer_groups_list:
    print(f"Consumer Group: {cg.name} - {cg.user_metadata}")

Types

class EHNamespace(TrackedResource):
    def __init__(
        self,
        location: Optional[str] = None,
        tags: Optional[Dict[str, str]] = None,
        sku: Optional[Sku] = None,
        identity: Optional[Identity] = None,
        minimum_tls_version: Optional[Union[str, TlsVersion]] = None,
        public_network_access: Optional[Union[str, PublicNetworkAccess]] = None,
        disable_local_auth: Optional[bool] = None,
        zone_redundant: Optional[bool] = None,
        is_auto_inflate_enabled: Optional[bool] = None,
        maximum_throughput_units: Optional[int] = None,
        kafka_enabled: Optional[bool] = None,
        encryption: Optional[Encryption] = None,
        **kwargs: Any
    ): ...
    
    sku: Optional[Sku]
    identity: Optional[Identity]
    provisioning_state: Optional[str]
    status: Optional[str]
    created_at: Optional[datetime]
    updated_at: Optional[datetime]
    service_bus_endpoint: Optional[str]
    cluster_arm_id: Optional[str]

class Eventhub(ProxyResource):
    def __init__(
        self,
        message_retention_in_days: Optional[int] = None,
        partition_count: Optional[int] = None,
        status: Optional[Union[str, EntityStatus]] = None,
        capture_description: Optional[CaptureDescription] = None,
        **kwargs: Any
    ): ...
    
    partition_ids: Optional[List[str]]
    created_at: Optional[datetime]
    updated_at: Optional[datetime]

class ConsumerGroup(ProxyResource):
    def __init__(
        self,
        user_metadata: Optional[str] = None,
        **kwargs: Any
    ): ...
    
    created_at: Optional[datetime]
    updated_at: Optional[datetime]

class Sku:
    def __init__(
        self,
        name: Union[str, SkuName],
        tier: Optional[Union[str, SkuTier]] = None,
        capacity: Optional[int] = None,
        **kwargs: Any
    ): ...

class CheckNameAvailabilityParameter:
    def __init__(
        self,
        name: str,
        **kwargs: Any
    ): ...

class CheckNameAvailabilityResult:
    def __init__(
        self,
        message: Optional[str] = None,
        name_available: Optional[bool] = None,
        reason: Optional[Union[str, UnavailableReason]] = None,
        **kwargs: Any
    ): ...

Install with Tessl CLI

npx tessl i tessl/pypi-azure-mgmt-eventhub

docs

core-management.md

enterprise-features.md

index.md

security-access-control.md

tile.json