Microsoft Azure IoT Hub Management Client Library for programmatic management of Azure IoT Hub resources through the Azure Resource Manager API
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Management of Event Hub consumer groups for IoT Hub's built-in Event Hub endpoints, enabling multiple applications to independently read device-to-cloud messages with separate offsets and processing states for distributed telemetry processing architectures.
Retrieve all consumer groups configured for a specific Event Hub endpoint to understand the current processing topology and identify available consumer group names.
def list_event_hub_consumer_groups(
resource_group_name: str,
resource_name: str,
event_hub_endpoint_name: str,
**kwargs
) -> ItemPaged[EventHubConsumerGroupInfo]:
"""
Get list of all consumer groups for the Event Hub-compatible endpoint.
Args:
resource_group_name: Name of the resource group
resource_name: Name of the IoT hub resource
event_hub_endpoint_name: Name of the Event Hub endpoint (e.g., "events")
Returns:
ItemPaged[EventHubConsumerGroupInfo]: Paginated list of consumer groups with metadata
"""Retrieve detailed information about a specific consumer group including its configuration and metadata for monitoring and validation purposes.
def get_event_hub_consumer_group(
resource_group_name: str,
resource_name: str,
event_hub_endpoint_name: str,
name: str,
**kwargs
) -> EventHubConsumerGroupInfo:
"""
Get details of a specific consumer group for Event Hub endpoint.
Args:
resource_group_name: Name of the resource group
resource_name: Name of the IoT hub resource
event_hub_endpoint_name: Name of the Event Hub endpoint (e.g., "events")
name: Name of the consumer group
Returns:
EventHubConsumerGroupInfo: Consumer group details and metadata
"""Create a new consumer group for independent processing of device-to-cloud messages, enabling parallel processing by multiple applications or services.
def create_event_hub_consumer_group(
resource_group_name: str,
resource_name: str,
event_hub_endpoint_name: str,
name: str,
consumer_group_body: EventHubConsumerGroupBodyDescription,
**kwargs
) -> EventHubConsumerGroupInfo:
"""
Create a new consumer group for the Event Hub-compatible endpoint.
Args:
resource_group_name: Name of the resource group
resource_name: Name of the IoT hub resource
event_hub_endpoint_name: Name of the Event Hub endpoint (e.g., "events")
name: Name for the new consumer group
consumer_group_body: Consumer group configuration and properties
Returns:
EventHubConsumerGroupInfo: Details of the created consumer group
"""Remove a consumer group when it's no longer needed, ensuring proper cleanup of Event Hub endpoint resources and consumer group allocations.
def delete_event_hub_consumer_group(
resource_group_name: str,
resource_name: str,
event_hub_endpoint_name: str,
name: str,
**kwargs
) -> None:
"""
Delete a consumer group from the Event Hub-compatible endpoint.
Args:
resource_group_name: Name of the resource group
resource_name: Name of the IoT hub resource
event_hub_endpoint_name: Name of the Event Hub endpoint (e.g., "events")
name: Name of the consumer group to delete
Returns:
None: Operation completed successfully
"""from azure.identity import DefaultAzureCredential
from azure.mgmt.iothub import IotHubClient
from azure.mgmt.iothub.models import EventHubConsumerGroupBodyDescription
# Initialize client
credential = DefaultAzureCredential()
client = IotHubClient(credential, "subscription-id")
resource_group = "myResourceGroup"
hub_name = "myIoTHub"
events_endpoint = "events" # Built-in Event Hub endpoint
# List existing consumer groups
print("Existing consumer groups:")
for group in client.iot_hub_resource.list_event_hub_consumer_groups(
resource_group, hub_name, events_endpoint
):
print(f" - {group.name}")
# Create a new consumer group for analytics processing
analytics_group_config = EventHubConsumerGroupBodyDescription(
properties={"name": "analytics-processor"}
)
analytics_group = client.iot_hub_resource.create_event_hub_consumer_group(
resource_group,
hub_name,
events_endpoint,
"analytics-processor",
analytics_group_config
)
print(f"Created consumer group: {analytics_group.name}")
# Create consumer group for real-time alerts
alerts_group_config = EventHubConsumerGroupBodyDescription(
properties={"name": "alerts-processor"}
)
alerts_group = client.iot_hub_resource.create_event_hub_consumer_group(
resource_group,
hub_name,
events_endpoint,
"alerts-processor",
alerts_group_config
)
print(f"Created consumer group: {alerts_group.name}")# Get details of a specific consumer group
try:
group_details = client.iot_hub_resource.get_event_hub_consumer_group(
resource_group, hub_name, events_endpoint, "analytics-processor"
)
print(f"Consumer group '{group_details.name}' exists")
print(f" Resource type: {group_details.type}")
print(f" ETag: {group_details.etag}")
except Exception as e:
print(f"Consumer group not found: {e}")
# Clean up unused consumer groups
unused_groups = ["old-processor", "test-consumer"]
for group_name in unused_groups:
try:
client.iot_hub_resource.delete_event_hub_consumer_group(
resource_group, hub_name, events_endpoint, group_name
)
print(f"Deleted consumer group: {group_name}")
except Exception as e:
print(f"Failed to delete {group_name}: {e}")
# Verify final consumer group list
print("\nRemaining consumer groups:")
for group in client.iot_hub_resource.list_event_hub_consumer_groups(
resource_group, hub_name, events_endpoint
):
print(f" - {group.name}")# Create consumer groups for different processing functions
processing_groups = [
("telemetry-storage", "Long-term telemetry storage processor"),
("real-time-dashboard", "Real-time dashboard data processor"),
("anomaly-detection", "ML-based anomaly detection processor"),
("device-health", "Device health monitoring processor")
]
created_groups = []
for group_name, description in processing_groups:
try:
group_config = EventHubConsumerGroupBodyDescription(
properties={"name": group_name}
)
group = client.iot_hub_resource.create_event_hub_consumer_group(
resource_group, hub_name, events_endpoint, group_name, group_config
)
created_groups.append(group_name)
print(f"✓ Created consumer group: {group_name}")
except Exception as e:
print(f"✗ Failed to create {group_name}: {e}")
print(f"\nSuccessfully created {len(created_groups)} consumer groups")
print("Each group can now independently process device messages with separate offsets")Primary model representing a consumer group configuration and metadata.
class EventHubConsumerGroupInfo:
"""Consumer group information for Event Hub endpoints."""
name: str # Consumer group name
type: str # Resource type identifier
etag: str # ETag for concurrency control
properties: Dict[str, Any] # Consumer group properties and metadataConfiguration model for creating new consumer groups with specified properties and settings.
class EventHubConsumerGroupBodyDescription:
"""Consumer group creation configuration."""
properties: Dict[str, Any] # Consumer group properties to set during creationInstall with Tessl CLI
npx tessl i tessl/pypi-azure-mgmt-iothub