CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-mgmt-iothub

Microsoft Azure IoT Hub Management Client Library for programmatic management of Azure IoT Hub resources through the Azure Resource Manager API

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

event-hub-consumer-groups.mddocs/

Event Hub Consumer Groups

Management of Event Hub consumer groups for IoT Hub's built-in Event Hub endpoints, enabling multiple applications to independently read device-to-cloud messages with separate offsets and processing states for distributed telemetry processing architectures.

Capabilities

List Consumer Groups

Retrieve all consumer groups configured for a specific Event Hub endpoint to understand the current processing topology and identify available consumer group names.

def list_event_hub_consumer_groups(
    resource_group_name: str, 
    resource_name: str, 
    event_hub_endpoint_name: str, 
    **kwargs
) -> ItemPaged[EventHubConsumerGroupInfo]:
    """
    Get list of all consumer groups for the Event Hub-compatible endpoint.
    
    Args:
        resource_group_name: Name of the resource group
        resource_name: Name of the IoT hub resource
        event_hub_endpoint_name: Name of the Event Hub endpoint (e.g., "events")
        
    Returns:
        ItemPaged[EventHubConsumerGroupInfo]: Paginated list of consumer groups with metadata
    """

Get Consumer Group Details

Retrieve detailed information about a specific consumer group including its configuration and metadata for monitoring and validation purposes.

def get_event_hub_consumer_group(
    resource_group_name: str, 
    resource_name: str, 
    event_hub_endpoint_name: str, 
    name: str, 
    **kwargs
) -> EventHubConsumerGroupInfo:
    """
    Get details of a specific consumer group for Event Hub endpoint.
    
    Args:
        resource_group_name: Name of the resource group
        resource_name: Name of the IoT hub resource
        event_hub_endpoint_name: Name of the Event Hub endpoint (e.g., "events")
        name: Name of the consumer group
        
    Returns:
        EventHubConsumerGroupInfo: Consumer group details and metadata
    """

Create Consumer Group

Create a new consumer group for independent processing of device-to-cloud messages, enabling parallel processing by multiple applications or services.

def create_event_hub_consumer_group(
    resource_group_name: str, 
    resource_name: str, 
    event_hub_endpoint_name: str, 
    name: str, 
    consumer_group_body: EventHubConsumerGroupBodyDescription, 
    **kwargs
) -> EventHubConsumerGroupInfo:
    """
    Create a new consumer group for the Event Hub-compatible endpoint.
    
    Args:
        resource_group_name: Name of the resource group
        resource_name: Name of the IoT hub resource
        event_hub_endpoint_name: Name of the Event Hub endpoint (e.g., "events")
        name: Name for the new consumer group
        consumer_group_body: Consumer group configuration and properties
        
    Returns:
        EventHubConsumerGroupInfo: Details of the created consumer group
    """

Delete Consumer Group

Remove a consumer group when it's no longer needed, ensuring proper cleanup of Event Hub endpoint resources and consumer group allocations.

def delete_event_hub_consumer_group(
    resource_group_name: str, 
    resource_name: str, 
    event_hub_endpoint_name: str, 
    name: str, 
    **kwargs
) -> None:
    """
    Delete a consumer group from the Event Hub-compatible endpoint.
    
    Args:
        resource_group_name: Name of the resource group
        resource_name: Name of the IoT hub resource
        event_hub_endpoint_name: Name of the Event Hub endpoint (e.g., "events")
        name: Name of the consumer group to delete
        
    Returns:
        None: Operation completed successfully
    """

Usage Examples

Creating and managing consumer groups for telemetry processing

from azure.identity import DefaultAzureCredential
from azure.mgmt.iothub import IotHubClient
from azure.mgmt.iothub.models import EventHubConsumerGroupBodyDescription

# Initialize client
credential = DefaultAzureCredential()
client = IotHubClient(credential, "subscription-id")

resource_group = "myResourceGroup"
hub_name = "myIoTHub"
events_endpoint = "events"  # Built-in Event Hub endpoint

# List existing consumer groups
print("Existing consumer groups:")
for group in client.iot_hub_resource.list_event_hub_consumer_groups(
    resource_group, hub_name, events_endpoint
):
    print(f"  - {group.name}")

# Create a new consumer group for analytics processing
analytics_group_config = EventHubConsumerGroupBodyDescription(
    properties={"name": "analytics-processor"}
)

analytics_group = client.iot_hub_resource.create_event_hub_consumer_group(
    resource_group, 
    hub_name, 
    events_endpoint,
    "analytics-processor",
    analytics_group_config
)
print(f"Created consumer group: {analytics_group.name}")

# Create consumer group for real-time alerts
alerts_group_config = EventHubConsumerGroupBodyDescription(
    properties={"name": "alerts-processor"}
)

alerts_group = client.iot_hub_resource.create_event_hub_consumer_group(
    resource_group, 
    hub_name, 
    events_endpoint,
    "alerts-processor", 
    alerts_group_config
)
print(f"Created consumer group: {alerts_group.name}")

Monitoring and cleanup of consumer groups

# Get details of a specific consumer group
try:
    group_details = client.iot_hub_resource.get_event_hub_consumer_group(
        resource_group, hub_name, events_endpoint, "analytics-processor"
    )
    print(f"Consumer group '{group_details.name}' exists")
    print(f"  Resource type: {group_details.type}")
    print(f"  ETag: {group_details.etag}")
except Exception as e:
    print(f"Consumer group not found: {e}")

# Clean up unused consumer groups
unused_groups = ["old-processor", "test-consumer"]
for group_name in unused_groups:
    try:
        client.iot_hub_resource.delete_event_hub_consumer_group(
            resource_group, hub_name, events_endpoint, group_name
        )
        print(f"Deleted consumer group: {group_name}")
    except Exception as e:
        print(f"Failed to delete {group_name}: {e}")

# Verify final consumer group list
print("\nRemaining consumer groups:")
for group in client.iot_hub_resource.list_event_hub_consumer_groups(
    resource_group, hub_name, events_endpoint
):
    print(f"  - {group.name}")

Setting up distributed processing architecture

# Create consumer groups for different processing functions
processing_groups = [
    ("telemetry-storage", "Long-term telemetry storage processor"),
    ("real-time-dashboard", "Real-time dashboard data processor"), 
    ("anomaly-detection", "ML-based anomaly detection processor"),
    ("device-health", "Device health monitoring processor")
]

created_groups = []
for group_name, description in processing_groups:
    try:
        group_config = EventHubConsumerGroupBodyDescription(
            properties={"name": group_name}
        )
        
        group = client.iot_hub_resource.create_event_hub_consumer_group(
            resource_group, hub_name, events_endpoint, group_name, group_config
        )
        created_groups.append(group_name)
        print(f"✓ Created consumer group: {group_name}")
    except Exception as e:
        print(f"✗ Failed to create {group_name}: {e}")

print(f"\nSuccessfully created {len(created_groups)} consumer groups")
print("Each group can now independently process device messages with separate offsets")

Types

EventHubConsumerGroupInfo

Primary model representing a consumer group configuration and metadata.

class EventHubConsumerGroupInfo:
    """Consumer group information for Event Hub endpoints."""
    name: str                    # Consumer group name
    type: str                    # Resource type identifier
    etag: str                    # ETag for concurrency control
    properties: Dict[str, Any]   # Consumer group properties and metadata

EventHubConsumerGroupBodyDescription

Configuration model for creating new consumer groups with specified properties and settings.

class EventHubConsumerGroupBodyDescription:
    """Consumer group creation configuration."""
    properties: Dict[str, Any]   # Consumer group properties to set during creation

Install with Tessl CLI

npx tessl i tessl/pypi-azure-mgmt-iothub

docs

device-operations.md

event-hub-consumer-groups.md

failover-operations.md

index.md

message-routing.md

monitoring-quotas.md

private-networking.md

resource-management.md

security-management.md

utility-operations.md

tile.json