CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-azure-mgmt-iothub

Microsoft Azure IoT Hub Management Client Library for programmatic management of Azure IoT Hub resources through the Azure Resource Manager API

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

message-routing.mddocs/

Message Routing

Configuration and management of message routing from devices to various Azure services including Event Hubs, Service Bus, Storage, and CosmosDB. This module enables sophisticated message processing pipelines with routing rules, message enrichment, and endpoint health monitoring for scalable IoT solutions.

Capabilities

Route Testing and Validation

Test routing configurations before deployment to ensure messages are correctly routed to intended endpoints based on routing conditions.

def test_all_routes(
    iot_hub_name: str,
    resource_group_name: str,
    input: TestAllRoutesInput,
    **kwargs
) -> TestAllRoutesResult:
    """
    Test all routes configured in IoT Hub against a sample message.
    
    Args:
        iot_hub_name: Name of the IoT hub
        resource_group_name: Name of the resource group
        input: Test message and routing configuration
        
    Returns:
        TestAllRoutesResult: Results showing which routes matched and their details
    """

def test_route(
    iot_hub_name: str,
    resource_group_name: str,
    input: TestRouteInput,
    **kwargs
) -> TestRouteResult:
    """
    Test a new route for IoT Hub against a sample message.
    
    Args:
        iot_hub_name: Name of the IoT hub
        resource_group_name: Name of the resource group
        input: Route definition and test message
        
    Returns:
        TestRouteResult: Result indicating whether the route matched
    """

Endpoint Health Monitoring

Monitor the health status of routing endpoints to ensure reliable message delivery and identify connectivity issues.

def get_endpoint_health(
    resource_group_name: str,
    iot_hub_name: str,
    **kwargs
) -> ItemPaged[EndpointHealthData]:
    """
    Get health status for routing endpoints.
    
    Args:
        resource_group_name: Name of the resource group
        iot_hub_name: Name of the IoT hub
        
    Returns:
        ItemPaged[EndpointHealthData]: Health status for all configured endpoints
    """

Usage Examples

Testing routing configuration

from azure.identity import DefaultAzureCredential
from azure.mgmt.iothub import IotHubClient
from azure.mgmt.iothub.models import (
    TestAllRoutesInput, RoutingMessage, RoutingTwin, RoutingTwinProperties
)

credential = DefaultAzureCredential()
client = IotHubClient(credential, "subscription-id")

# Create test message with device properties
test_message = RoutingMessage(
    body="temperature:25.5,humidity:60.2",
    app_properties={"messageType": "telemetry", "sensorLocation": "warehouse"},
    system_properties={"connectionDeviceId": "sensor-001", "enqueuedTime": "2023-12-01T10:30:00Z"}
)

# Create test twin properties
twin_properties = RoutingTwinProperties(
    desired={"reportingInterval": 300},
    reported={"lastReporting": "2023-12-01T10:25:00Z", "batteryLevel": 85}
)

test_twin = RoutingTwin(
    tags={"location": "warehouse", "deviceType": "environmental"},
    properties=twin_properties
)

# Test all configured routes
test_input = TestAllRoutesInput(
    routing_source="DeviceMessages",
    message=test_message,
    twin=test_twin
)

test_result = client.iot_hub_resource.test_all_routes(
    "my-iot-hub",
    "my-resource-group", 
    test_input
)

print(f"Routes tested: {len(test_result.routes)}")
for route in test_result.routes:
    print(f"Route '{route.name}': {'MATCHED' if route.result == 'true' else 'NO MATCH'}")
    if route.result == "true":
        print(f"  Endpoints: {', '.join(route.endpoint_names)}")

Testing a specific route

from azure.mgmt.iothub.models import TestRouteInput, RouteProperties

# Define a new route to test
new_route = RouteProperties(
    name="high-temperature-alert",
    source="DeviceMessages",
    condition="$body.temperature > 30",
    endpoint_names=["alert-endpoint"],
    is_enabled=True
)

route_test_input = TestRouteInput(
    message=test_message,
    route=new_route,
    twin=test_twin
)

route_result = client.iot_hub_resource.test_route(
    "my-iot-hub",
    "my-resource-group",
    route_test_input
)

print(f"Route test result: {route_result.result}")
if route_result.details:
    print(f"Details: {route_result.details.compilation_errors}")

Monitoring endpoint health

# Get health status for all endpoints
endpoint_health = list(client.iot_hub_resource.get_endpoint_health(
    "my-resource-group",
    "my-iot-hub"
))

for endpoint in endpoint_health:
    print(f"Endpoint: {endpoint.endpoint_id}")
    print(f"  Health Status: {endpoint.health_status}")
    
    if endpoint.health_status != "Healthy":
        print(f"  Last Error: {endpoint.last_known_error}")
        print(f"  Last Error Time: {endpoint.last_known_error_time}")
    
    print(f"  Last Successful Send: {endpoint.last_successful_send_attempt_time}")
    print(f"  Last Send Attempt: {endpoint.last_send_attempt_time}")

# Check for unhealthy endpoints
unhealthy_endpoints = [
    ep for ep in endpoint_health 
    if ep.health_status in ["Unhealthy", "Dead", "Degraded"]
]

if unhealthy_endpoints:
    print(f"\nWARNING: {len(unhealthy_endpoints)} unhealthy endpoints detected!")
    for ep in unhealthy_endpoints:
        print(f"  {ep.endpoint_id}: {ep.health_status}")

Comprehensive routing configuration example

from azure.mgmt.iothub.models import (
    IotHubDescription, IotHubProperties, RoutingProperties,
    RoutingEndpoints, RouteProperties, FallbackRouteProperties,
    RoutingEventHubProperties, RoutingServiceBusQueueEndpointProperties,
    RoutingStorageContainerProperties, EnrichmentProperties
)

# Define custom endpoints for routing
event_hub_endpoint = RoutingEventHubProperties(
    name="telemetry-events",
    endpoint_uri="sb://mynamespace.servicebus.windows.net/",
    entity_path="telemetry-hub"
)

service_bus_endpoint = RoutingServiceBusQueueEndpointProperties(
    name="alerts-queue",
    endpoint_uri="sb://mynamespace.servicebus.windows.net/",
    entity_path="alerts"
)

storage_endpoint = RoutingStorageContainerProperties(
    name="data-archive",
    endpoint_uri="https://mystorageaccount.blob.core.windows.net/",
    container_name="iot-data",
    file_name_format="{iothub}/{partition}/{YYYY}/{MM}/{DD}/{HH}/{mm}_{deviceId}.json",
    encoding="JSON"
)

routing_endpoints = RoutingEndpoints(
    event_hubs=[event_hub_endpoint],
    service_bus_queues=[service_bus_endpoint],
    storage_containers=[storage_endpoint]
)

# Define routing rules
telemetry_route = RouteProperties(
    name="telemetry-to-eventhub",
    source="DeviceMessages",
    condition="$body.messageType = 'telemetry'",
    endpoint_names=["telemetry-events"],
    is_enabled=True
)

alert_route = RouteProperties(
    name="high-temp-alerts",
    source="DeviceMessages", 
    condition="$body.temperature > 35",
    endpoint_names=["alerts-queue"],
    is_enabled=True
)

archive_route = RouteProperties(
    name="data-archive",
    source="DeviceMessages",
    condition="true",  # Archive all messages
    endpoint_names=["data-archive"],
    is_enabled=True
)

# Define fallback route
fallback_route = FallbackRouteProperties(
    name="$fallback",
    source="DeviceMessages",
    condition="true",
    endpoint_names=["events"],  # Built-in Event Hub endpoint
    is_enabled=True
)

# Define message enrichments
enrichments = [
    EnrichmentProperties(
        key="hubName",
        value="$iothubname",
        endpoint_names=["telemetry-events", "data-archive"]
    ),
    EnrichmentProperties(
        key="deviceLocation", 
        value="$twin.tags.location",
        endpoint_names=["alerts-queue"]
    )
]

# Complete routing configuration
routing_config = RoutingProperties(
    endpoints=routing_endpoints,
    routes=[telemetry_route, alert_route, archive_route],
    fallback_route=fallback_route,
    enrichments=enrichments
)

print("Routing configuration created successfully")
print(f"Custom endpoints: {len(routing_config.routes)} routes defined")
print(f"Message enrichments: {len(routing_config.enrichments)} enrichments defined")

Types

class TestAllRoutesInput:
    """
    Input for testing all configured routes.
    
    Attributes:
        routing_source: Source of the routing message
        message: Test message payload
        twin: Device twin for routing evaluation (optional)
    """
    routing_source: Optional[RoutingSource]
    message: Optional[RoutingMessage]
    twin: Optional[RoutingTwin]

class TestAllRoutesResult:
    """
    Result of testing all routes.
    
    Attributes:
        routes: Results for each route tested
    """
    routes: Optional[List[MatchedRoute]]

class TestRouteInput:
    """
    Input for testing a specific route.
    
    Attributes:
        message: Test message payload
        route: Route definition to test
        twin: Device twin for routing evaluation (optional)
    """
    message: Optional[RoutingMessage]
    route: Optional[RouteProperties]
    twin: Optional[RoutingTwin]

class TestRouteResult:
    """
    Result of testing a specific route.
    
    Attributes:
        result: Whether the route matched (true/false)  
        details: Additional details including compilation errors
    """
    result: Optional[TestResultStatus]
    details: Optional[TestRouteResultDetails]

class RoutingMessage:
    """
    Message payload for routing tests.
    
    Attributes:
        body: Message body content
        app_properties: Application-defined message properties
        system_properties: System-defined message properties
    """
    body: Optional[str]
    app_properties: Optional[Dict[str, str]]
    system_properties: Optional[Dict[str, str]]

class EndpointHealthData:
    """
    Health status information for routing endpoint.
    
    Attributes:
        endpoint_id: Unique endpoint identifier (readonly)
        health_status: Current health status (readonly)
        last_known_error: Last error message (readonly)
        last_known_error_time: Timestamp of last error (readonly)
        last_successful_send_attempt_time: Last successful message send (readonly)
        last_send_attempt_time: Last send attempt timestamp (readonly)
    """
    endpoint_id: Optional[str]
    health_status: Optional[EndpointHealthStatus]
    last_known_error: Optional[str]
    last_known_error_time: Optional[datetime]
    last_successful_send_attempt_time: Optional[datetime]
    last_send_attempt_time: Optional[datetime]

class RoutingProperties:
    """
    Complete message routing configuration.
    
    Attributes:
        endpoints: Custom routing endpoints
        routes: Routing rules
        fallback_route: Default route for unmatched messages
        enrichments: Message enrichment rules
    """
    endpoints: Optional[RoutingEndpoints]
    routes: Optional[List[RouteProperties]]
    fallback_route: Optional[FallbackRouteProperties]
    enrichments: Optional[List[EnrichmentProperties]]

class RouteProperties:
    """
    Individual routing rule configuration.
    
    Attributes:
        name: Route name (required)
        source: Message source type (required) 
        condition: Routing filter condition
        endpoint_names: Target endpoint names (required)
        is_enabled: Whether route is active (required)
    """
    name: str
    source: RoutingSource
    condition: Optional[str]
    endpoint_names: List[str]
    is_enabled: bool

Install with Tessl CLI

npx tessl i tessl/pypi-azure-mgmt-iothub

docs

device-operations.md

event-hub-consumer-groups.md

failover-operations.md

index.md

message-routing.md

monitoring-quotas.md

private-networking.md

resource-management.md

security-management.md

utility-operations.md

tile.json