0
# Message Routing
1
2
Configuration and management of message routing from devices to various Azure services including Event Hubs, Service Bus, Storage, and CosmosDB. This module enables sophisticated message processing pipelines with routing rules, message enrichment, and endpoint health monitoring for scalable IoT solutions.
3
4
## Capabilities
5
6
### Route Testing and Validation
7
8
Test routing configurations before deployment to ensure messages are correctly routed to intended endpoints based on routing conditions.
9
10
```python { .api }
11
def test_all_routes(
12
iot_hub_name: str,
13
resource_group_name: str,
14
input: TestAllRoutesInput,
15
**kwargs
16
) -> TestAllRoutesResult:
17
"""
18
Test all routes configured in IoT Hub against a sample message.
19
20
Args:
21
iot_hub_name: Name of the IoT hub
22
resource_group_name: Name of the resource group
23
input: Test message and routing configuration
24
25
Returns:
26
TestAllRoutesResult: Results showing which routes matched and their details
27
"""
28
29
def test_route(
30
iot_hub_name: str,
31
resource_group_name: str,
32
input: TestRouteInput,
33
**kwargs
34
) -> TestRouteResult:
35
"""
36
Test a new route for IoT Hub against a sample message.
37
38
Args:
39
iot_hub_name: Name of the IoT hub
40
resource_group_name: Name of the resource group
41
input: Route definition and test message
42
43
Returns:
44
TestRouteResult: Result indicating whether the route matched
45
"""
46
```
47
48
### Endpoint Health Monitoring
49
50
Monitor the health status of routing endpoints to ensure reliable message delivery and identify connectivity issues.
51
52
```python { .api }
53
def get_endpoint_health(
54
resource_group_name: str,
55
iot_hub_name: str,
56
**kwargs
57
) -> ItemPaged[EndpointHealthData]:
58
"""
59
Get health status for routing endpoints.
60
61
Args:
62
resource_group_name: Name of the resource group
63
iot_hub_name: Name of the IoT hub
64
65
Returns:
66
ItemPaged[EndpointHealthData]: Health status for all configured endpoints
67
"""
68
```
69
70
## Usage Examples
71
72
### Testing routing configuration
73
74
```python
75
from azure.identity import DefaultAzureCredential
76
from azure.mgmt.iothub import IotHubClient
77
from azure.mgmt.iothub.models import (
78
TestAllRoutesInput, RoutingMessage, RoutingTwin, RoutingTwinProperties
79
)
80
81
credential = DefaultAzureCredential()
82
client = IotHubClient(credential, "subscription-id")
83
84
# Create test message with device properties
85
test_message = RoutingMessage(
86
body="temperature:25.5,humidity:60.2",
87
app_properties={"messageType": "telemetry", "sensorLocation": "warehouse"},
88
system_properties={"connectionDeviceId": "sensor-001", "enqueuedTime": "2023-12-01T10:30:00Z"}
89
)
90
91
# Create test twin properties
92
twin_properties = RoutingTwinProperties(
93
desired={"reportingInterval": 300},
94
reported={"lastReporting": "2023-12-01T10:25:00Z", "batteryLevel": 85}
95
)
96
97
test_twin = RoutingTwin(
98
tags={"location": "warehouse", "deviceType": "environmental"},
99
properties=twin_properties
100
)
101
102
# Test all configured routes
103
test_input = TestAllRoutesInput(
104
routing_source="DeviceMessages",
105
message=test_message,
106
twin=test_twin
107
)
108
109
test_result = client.iot_hub_resource.test_all_routes(
110
"my-iot-hub",
111
"my-resource-group",
112
test_input
113
)
114
115
print(f"Routes tested: {len(test_result.routes)}")
116
for route in test_result.routes:
117
print(f"Route '{route.name}': {'MATCHED' if route.result == 'true' else 'NO MATCH'}")
118
if route.result == "true":
119
print(f" Endpoints: {', '.join(route.endpoint_names)}")
120
```
121
122
### Testing a specific route
123
124
```python
125
from azure.mgmt.iothub.models import TestRouteInput, RouteProperties
126
127
# Define a new route to test
128
new_route = RouteProperties(
129
name="high-temperature-alert",
130
source="DeviceMessages",
131
condition="$body.temperature > 30",
132
endpoint_names=["alert-endpoint"],
133
is_enabled=True
134
)
135
136
route_test_input = TestRouteInput(
137
message=test_message,
138
route=new_route,
139
twin=test_twin
140
)
141
142
route_result = client.iot_hub_resource.test_route(
143
"my-iot-hub",
144
"my-resource-group",
145
route_test_input
146
)
147
148
print(f"Route test result: {route_result.result}")
149
if route_result.details:
150
print(f"Details: {route_result.details.compilation_errors}")
151
```
152
153
### Monitoring endpoint health
154
155
```python
156
# Get health status for all endpoints
157
endpoint_health = list(client.iot_hub_resource.get_endpoint_health(
158
"my-resource-group",
159
"my-iot-hub"
160
))
161
162
for endpoint in endpoint_health:
163
print(f"Endpoint: {endpoint.endpoint_id}")
164
print(f" Health Status: {endpoint.health_status}")
165
166
if endpoint.health_status != "Healthy":
167
print(f" Last Error: {endpoint.last_known_error}")
168
print(f" Last Error Time: {endpoint.last_known_error_time}")
169
170
print(f" Last Successful Send: {endpoint.last_successful_send_attempt_time}")
171
print(f" Last Send Attempt: {endpoint.last_send_attempt_time}")
172
173
# Check for unhealthy endpoints
174
unhealthy_endpoints = [
175
ep for ep in endpoint_health
176
if ep.health_status in ["Unhealthy", "Dead", "Degraded"]
177
]
178
179
if unhealthy_endpoints:
180
print(f"\nWARNING: {len(unhealthy_endpoints)} unhealthy endpoints detected!")
181
for ep in unhealthy_endpoints:
182
print(f" {ep.endpoint_id}: {ep.health_status}")
183
```
184
185
### Comprehensive routing configuration example
186
187
```python
188
from azure.mgmt.iothub.models import (
189
IotHubDescription, IotHubProperties, RoutingProperties,
190
RoutingEndpoints, RouteProperties, FallbackRouteProperties,
191
RoutingEventHubProperties, RoutingServiceBusQueueEndpointProperties,
192
RoutingStorageContainerProperties, EnrichmentProperties
193
)
194
195
# Define custom endpoints for routing
196
event_hub_endpoint = RoutingEventHubProperties(
197
name="telemetry-events",
198
endpoint_uri="sb://mynamespace.servicebus.windows.net/",
199
entity_path="telemetry-hub"
200
)
201
202
service_bus_endpoint = RoutingServiceBusQueueEndpointProperties(
203
name="alerts-queue",
204
endpoint_uri="sb://mynamespace.servicebus.windows.net/",
205
entity_path="alerts"
206
)
207
208
storage_endpoint = RoutingStorageContainerProperties(
209
name="data-archive",
210
endpoint_uri="https://mystorageaccount.blob.core.windows.net/",
211
container_name="iot-data",
212
file_name_format="{iothub}/{partition}/{YYYY}/{MM}/{DD}/{HH}/{mm}_{deviceId}.json",
213
encoding="JSON"
214
)
215
216
routing_endpoints = RoutingEndpoints(
217
event_hubs=[event_hub_endpoint],
218
service_bus_queues=[service_bus_endpoint],
219
storage_containers=[storage_endpoint]
220
)
221
222
# Define routing rules
223
telemetry_route = RouteProperties(
224
name="telemetry-to-eventhub",
225
source="DeviceMessages",
226
condition="$body.messageType = 'telemetry'",
227
endpoint_names=["telemetry-events"],
228
is_enabled=True
229
)
230
231
alert_route = RouteProperties(
232
name="high-temp-alerts",
233
source="DeviceMessages",
234
condition="$body.temperature > 35",
235
endpoint_names=["alerts-queue"],
236
is_enabled=True
237
)
238
239
archive_route = RouteProperties(
240
name="data-archive",
241
source="DeviceMessages",
242
condition="true", # Archive all messages
243
endpoint_names=["data-archive"],
244
is_enabled=True
245
)
246
247
# Define fallback route
248
fallback_route = FallbackRouteProperties(
249
name="$fallback",
250
source="DeviceMessages",
251
condition="true",
252
endpoint_names=["events"], # Built-in Event Hub endpoint
253
is_enabled=True
254
)
255
256
# Define message enrichments
257
enrichments = [
258
EnrichmentProperties(
259
key="hubName",
260
value="$iothubname",
261
endpoint_names=["telemetry-events", "data-archive"]
262
),
263
EnrichmentProperties(
264
key="deviceLocation",
265
value="$twin.tags.location",
266
endpoint_names=["alerts-queue"]
267
)
268
]
269
270
# Complete routing configuration
271
routing_config = RoutingProperties(
272
endpoints=routing_endpoints,
273
routes=[telemetry_route, alert_route, archive_route],
274
fallback_route=fallback_route,
275
enrichments=enrichments
276
)
277
278
print("Routing configuration created successfully")
279
print(f"Custom endpoints: {len(routing_config.routes)} routes defined")
280
print(f"Message enrichments: {len(routing_config.enrichments)} enrichments defined")
281
```
282
283
## Types
284
285
```python { .api }
286
class TestAllRoutesInput:
287
"""
288
Input for testing all configured routes.
289
290
Attributes:
291
routing_source: Source of the routing message
292
message: Test message payload
293
twin: Device twin for routing evaluation (optional)
294
"""
295
routing_source: Optional[RoutingSource]
296
message: Optional[RoutingMessage]
297
twin: Optional[RoutingTwin]
298
299
class TestAllRoutesResult:
300
"""
301
Result of testing all routes.
302
303
Attributes:
304
routes: Results for each route tested
305
"""
306
routes: Optional[List[MatchedRoute]]
307
308
class TestRouteInput:
309
"""
310
Input for testing a specific route.
311
312
Attributes:
313
message: Test message payload
314
route: Route definition to test
315
twin: Device twin for routing evaluation (optional)
316
"""
317
message: Optional[RoutingMessage]
318
route: Optional[RouteProperties]
319
twin: Optional[RoutingTwin]
320
321
class TestRouteResult:
322
"""
323
Result of testing a specific route.
324
325
Attributes:
326
result: Whether the route matched (true/false)
327
details: Additional details including compilation errors
328
"""
329
result: Optional[TestResultStatus]
330
details: Optional[TestRouteResultDetails]
331
332
class RoutingMessage:
333
"""
334
Message payload for routing tests.
335
336
Attributes:
337
body: Message body content
338
app_properties: Application-defined message properties
339
system_properties: System-defined message properties
340
"""
341
body: Optional[str]
342
app_properties: Optional[Dict[str, str]]
343
system_properties: Optional[Dict[str, str]]
344
345
class EndpointHealthData:
346
"""
347
Health status information for routing endpoint.
348
349
Attributes:
350
endpoint_id: Unique endpoint identifier (readonly)
351
health_status: Current health status (readonly)
352
last_known_error: Last error message (readonly)
353
last_known_error_time: Timestamp of last error (readonly)
354
last_successful_send_attempt_time: Last successful message send (readonly)
355
last_send_attempt_time: Last send attempt timestamp (readonly)
356
"""
357
endpoint_id: Optional[str]
358
health_status: Optional[EndpointHealthStatus]
359
last_known_error: Optional[str]
360
last_known_error_time: Optional[datetime]
361
last_successful_send_attempt_time: Optional[datetime]
362
last_send_attempt_time: Optional[datetime]
363
364
class RoutingProperties:
365
"""
366
Complete message routing configuration.
367
368
Attributes:
369
endpoints: Custom routing endpoints
370
routes: Routing rules
371
fallback_route: Default route for unmatched messages
372
enrichments: Message enrichment rules
373
"""
374
endpoints: Optional[RoutingEndpoints]
375
routes: Optional[List[RouteProperties]]
376
fallback_route: Optional[FallbackRouteProperties]
377
enrichments: Optional[List[EnrichmentProperties]]
378
379
class RouteProperties:
380
"""
381
Individual routing rule configuration.
382
383
Attributes:
384
name: Route name (required)
385
source: Message source type (required)
386
condition: Routing filter condition
387
endpoint_names: Target endpoint names (required)
388
is_enabled: Whether route is active (required)
389
"""
390
name: str
391
source: RoutingSource
392
condition: Optional[str]
393
endpoint_names: List[str]
394
is_enabled: bool
395
```