Microsoft Azure IoT Hub Management Client Library for programmatic management of Azure IoT Hub resources through the Azure Resource Manager API
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Configuration and management of message routing from devices to various Azure services including Event Hubs, Service Bus, Storage, and CosmosDB. This module enables sophisticated message processing pipelines with routing rules, message enrichment, and endpoint health monitoring for scalable IoT solutions.
Test routing configurations before deployment to ensure messages are correctly routed to intended endpoints based on routing conditions.
def test_all_routes(
iot_hub_name: str,
resource_group_name: str,
input: TestAllRoutesInput,
**kwargs
) -> TestAllRoutesResult:
"""
Test all routes configured in IoT Hub against a sample message.
Args:
iot_hub_name: Name of the IoT hub
resource_group_name: Name of the resource group
input: Test message and routing configuration
Returns:
TestAllRoutesResult: Results showing which routes matched and their details
"""
def test_route(
iot_hub_name: str,
resource_group_name: str,
input: TestRouteInput,
**kwargs
) -> TestRouteResult:
"""
Test a new route for IoT Hub against a sample message.
Args:
iot_hub_name: Name of the IoT hub
resource_group_name: Name of the resource group
input: Route definition and test message
Returns:
TestRouteResult: Result indicating whether the route matched
"""Monitor the health status of routing endpoints to ensure reliable message delivery and identify connectivity issues.
def get_endpoint_health(
resource_group_name: str,
iot_hub_name: str,
**kwargs
) -> ItemPaged[EndpointHealthData]:
"""
Get health status for routing endpoints.
Args:
resource_group_name: Name of the resource group
iot_hub_name: Name of the IoT hub
Returns:
ItemPaged[EndpointHealthData]: Health status for all configured endpoints
"""from azure.identity import DefaultAzureCredential
from azure.mgmt.iothub import IotHubClient
from azure.mgmt.iothub.models import (
TestAllRoutesInput, RoutingMessage, RoutingTwin, RoutingTwinProperties
)
credential = DefaultAzureCredential()
client = IotHubClient(credential, "subscription-id")
# Create test message with device properties
test_message = RoutingMessage(
body="temperature:25.5,humidity:60.2",
app_properties={"messageType": "telemetry", "sensorLocation": "warehouse"},
system_properties={"connectionDeviceId": "sensor-001", "enqueuedTime": "2023-12-01T10:30:00Z"}
)
# Create test twin properties
twin_properties = RoutingTwinProperties(
desired={"reportingInterval": 300},
reported={"lastReporting": "2023-12-01T10:25:00Z", "batteryLevel": 85}
)
test_twin = RoutingTwin(
tags={"location": "warehouse", "deviceType": "environmental"},
properties=twin_properties
)
# Test all configured routes
test_input = TestAllRoutesInput(
routing_source="DeviceMessages",
message=test_message,
twin=test_twin
)
test_result = client.iot_hub_resource.test_all_routes(
"my-iot-hub",
"my-resource-group",
test_input
)
print(f"Routes tested: {len(test_result.routes)}")
for route in test_result.routes:
print(f"Route '{route.name}': {'MATCHED' if route.result == 'true' else 'NO MATCH'}")
if route.result == "true":
print(f" Endpoints: {', '.join(route.endpoint_names)}")from azure.mgmt.iothub.models import TestRouteInput, RouteProperties
# Define a new route to test
new_route = RouteProperties(
name="high-temperature-alert",
source="DeviceMessages",
condition="$body.temperature > 30",
endpoint_names=["alert-endpoint"],
is_enabled=True
)
route_test_input = TestRouteInput(
message=test_message,
route=new_route,
twin=test_twin
)
route_result = client.iot_hub_resource.test_route(
"my-iot-hub",
"my-resource-group",
route_test_input
)
print(f"Route test result: {route_result.result}")
if route_result.details:
print(f"Details: {route_result.details.compilation_errors}")# Get health status for all endpoints
endpoint_health = list(client.iot_hub_resource.get_endpoint_health(
"my-resource-group",
"my-iot-hub"
))
for endpoint in endpoint_health:
print(f"Endpoint: {endpoint.endpoint_id}")
print(f" Health Status: {endpoint.health_status}")
if endpoint.health_status != "Healthy":
print(f" Last Error: {endpoint.last_known_error}")
print(f" Last Error Time: {endpoint.last_known_error_time}")
print(f" Last Successful Send: {endpoint.last_successful_send_attempt_time}")
print(f" Last Send Attempt: {endpoint.last_send_attempt_time}")
# Check for unhealthy endpoints
unhealthy_endpoints = [
ep for ep in endpoint_health
if ep.health_status in ["Unhealthy", "Dead", "Degraded"]
]
if unhealthy_endpoints:
print(f"\nWARNING: {len(unhealthy_endpoints)} unhealthy endpoints detected!")
for ep in unhealthy_endpoints:
print(f" {ep.endpoint_id}: {ep.health_status}")from azure.mgmt.iothub.models import (
IotHubDescription, IotHubProperties, RoutingProperties,
RoutingEndpoints, RouteProperties, FallbackRouteProperties,
RoutingEventHubProperties, RoutingServiceBusQueueEndpointProperties,
RoutingStorageContainerProperties, EnrichmentProperties
)
# Define custom endpoints for routing
event_hub_endpoint = RoutingEventHubProperties(
name="telemetry-events",
endpoint_uri="sb://mynamespace.servicebus.windows.net/",
entity_path="telemetry-hub"
)
service_bus_endpoint = RoutingServiceBusQueueEndpointProperties(
name="alerts-queue",
endpoint_uri="sb://mynamespace.servicebus.windows.net/",
entity_path="alerts"
)
storage_endpoint = RoutingStorageContainerProperties(
name="data-archive",
endpoint_uri="https://mystorageaccount.blob.core.windows.net/",
container_name="iot-data",
file_name_format="{iothub}/{partition}/{YYYY}/{MM}/{DD}/{HH}/{mm}_{deviceId}.json",
encoding="JSON"
)
routing_endpoints = RoutingEndpoints(
event_hubs=[event_hub_endpoint],
service_bus_queues=[service_bus_endpoint],
storage_containers=[storage_endpoint]
)
# Define routing rules
telemetry_route = RouteProperties(
name="telemetry-to-eventhub",
source="DeviceMessages",
condition="$body.messageType = 'telemetry'",
endpoint_names=["telemetry-events"],
is_enabled=True
)
alert_route = RouteProperties(
name="high-temp-alerts",
source="DeviceMessages",
condition="$body.temperature > 35",
endpoint_names=["alerts-queue"],
is_enabled=True
)
archive_route = RouteProperties(
name="data-archive",
source="DeviceMessages",
condition="true", # Archive all messages
endpoint_names=["data-archive"],
is_enabled=True
)
# Define fallback route
fallback_route = FallbackRouteProperties(
name="$fallback",
source="DeviceMessages",
condition="true",
endpoint_names=["events"], # Built-in Event Hub endpoint
is_enabled=True
)
# Define message enrichments
enrichments = [
EnrichmentProperties(
key="hubName",
value="$iothubname",
endpoint_names=["telemetry-events", "data-archive"]
),
EnrichmentProperties(
key="deviceLocation",
value="$twin.tags.location",
endpoint_names=["alerts-queue"]
)
]
# Complete routing configuration
routing_config = RoutingProperties(
endpoints=routing_endpoints,
routes=[telemetry_route, alert_route, archive_route],
fallback_route=fallback_route,
enrichments=enrichments
)
print("Routing configuration created successfully")
print(f"Custom endpoints: {len(routing_config.routes)} routes defined")
print(f"Message enrichments: {len(routing_config.enrichments)} enrichments defined")class TestAllRoutesInput:
"""
Input for testing all configured routes.
Attributes:
routing_source: Source of the routing message
message: Test message payload
twin: Device twin for routing evaluation (optional)
"""
routing_source: Optional[RoutingSource]
message: Optional[RoutingMessage]
twin: Optional[RoutingTwin]
class TestAllRoutesResult:
"""
Result of testing all routes.
Attributes:
routes: Results for each route tested
"""
routes: Optional[List[MatchedRoute]]
class TestRouteInput:
"""
Input for testing a specific route.
Attributes:
message: Test message payload
route: Route definition to test
twin: Device twin for routing evaluation (optional)
"""
message: Optional[RoutingMessage]
route: Optional[RouteProperties]
twin: Optional[RoutingTwin]
class TestRouteResult:
"""
Result of testing a specific route.
Attributes:
result: Whether the route matched (true/false)
details: Additional details including compilation errors
"""
result: Optional[TestResultStatus]
details: Optional[TestRouteResultDetails]
class RoutingMessage:
"""
Message payload for routing tests.
Attributes:
body: Message body content
app_properties: Application-defined message properties
system_properties: System-defined message properties
"""
body: Optional[str]
app_properties: Optional[Dict[str, str]]
system_properties: Optional[Dict[str, str]]
class EndpointHealthData:
"""
Health status information for routing endpoint.
Attributes:
endpoint_id: Unique endpoint identifier (readonly)
health_status: Current health status (readonly)
last_known_error: Last error message (readonly)
last_known_error_time: Timestamp of last error (readonly)
last_successful_send_attempt_time: Last successful message send (readonly)
last_send_attempt_time: Last send attempt timestamp (readonly)
"""
endpoint_id: Optional[str]
health_status: Optional[EndpointHealthStatus]
last_known_error: Optional[str]
last_known_error_time: Optional[datetime]
last_successful_send_attempt_time: Optional[datetime]
last_send_attempt_time: Optional[datetime]
class RoutingProperties:
"""
Complete message routing configuration.
Attributes:
endpoints: Custom routing endpoints
routes: Routing rules
fallback_route: Default route for unmatched messages
enrichments: Message enrichment rules
"""
endpoints: Optional[RoutingEndpoints]
routes: Optional[List[RouteProperties]]
fallback_route: Optional[FallbackRouteProperties]
enrichments: Optional[List[EnrichmentProperties]]
class RouteProperties:
"""
Individual routing rule configuration.
Attributes:
name: Route name (required)
source: Message source type (required)
condition: Routing filter condition
endpoint_names: Target endpoint names (required)
is_enabled: Whether route is active (required)
"""
name: str
source: RoutingSource
condition: Optional[str]
endpoint_names: List[str]
is_enabled: boolInstall with Tessl CLI
npx tessl i tessl/pypi-azure-mgmt-iothub