0
# Error Handling
1
2
Structured exception hierarchy for MQTT errors, replacing callback-based error reporting with modern exception handling patterns.
3
4
## Capabilities
5
6
### Base MQTT Exception
7
8
The base exception class for all MQTT-related errors in aiomqtt.
9
10
```python { .api }
11
class MqttError(Exception):
12
"""
13
Base exception for all MQTT-related errors.
14
15
This exception is the parent class for all MQTT-specific errors
16
in aiomqtt, providing a common base for catching any MQTT-related
17
exception.
18
"""
19
```
20
21
**Usage example:**
22
23
```python
24
import asyncio
25
from aiomqtt import Client, MqttError
26
27
async def basic_error_handling():
28
try:
29
async with Client("invalid-broker-address") as client:
30
await client.publish("test/topic", "message")
31
except MqttError as e:
32
print(f"MQTT error occurred: {e}")
33
except Exception as e:
34
print(f"Non-MQTT error: {e}")
35
36
asyncio.run(basic_error_handling())
37
```
38
39
### MQTT Code Errors
40
41
Exception class for errors that include MQTT return codes or reason codes.
42
43
```python { .api }
44
class MqttCodeError(MqttError):
45
"""
46
MQTT error with return code or reason code information.
47
48
This exception includes additional context about the specific
49
MQTT error condition through return codes (MQTT v3.1.1) or
50
reason codes (MQTT v5.0).
51
"""
52
53
rc: int | ReasonCode | None
54
55
def __str__(self) -> str:
56
"""
57
Get formatted error message including return/reason code.
58
59
Returns:
60
str: Formatted error message with code information
61
"""
62
```
63
64
**Common MQTT return codes:**
65
- `0`: Success
66
- `1`: Unacceptable protocol version
67
- `2`: Identifier rejected
68
- `3`: Server unavailable
69
- `4`: Bad username or password
70
- `5`: Not authorized
71
72
**Usage examples:**
73
74
```python
75
import asyncio
76
from aiomqtt import Client, MqttCodeError
77
78
async def code_error_handling():
79
try:
80
async with Client(
81
"broker.example.com",
82
username="invalid_user",
83
password="wrong_password"
84
) as client:
85
await client.publish("test/topic", "message")
86
except MqttCodeError as e:
87
print(f"MQTT error with code {e.rc}: {e}")
88
89
# Handle specific error codes
90
if e.rc == 4: # Bad username or password
91
print("Authentication failed - check credentials")
92
elif e.rc == 5: # Not authorized
93
print("Authorization failed - insufficient permissions")
94
else:
95
print(f"Other MQTT error: {e.rc}")
96
except Exception as e:
97
print(f"Non-MQTT error: {e}")
98
99
async def publish_error_handling():
100
try:
101
async with Client("test.mosquitto.org") as client:
102
# Try to publish to a topic that might be restricted
103
await client.publish("$SYS/restricted", "should fail")
104
except MqttCodeError as e:
105
print(f"Publish failed with code {e.rc}: {e}")
106
107
# Handle publish-specific errors
108
if hasattr(e, 'rc') and e.rc:
109
print(f"Broker rejected publish with reason: {e.rc}")
110
except Exception as e:
111
print(f"Unexpected error: {e}")
112
113
# Run examples
114
asyncio.run(code_error_handling())
115
```
116
117
118
### Reentrant Usage Errors
119
120
Exception for improper concurrent usage of the client.
121
122
```python { .api }
123
class MqttReentrantError(MqttError):
124
"""
125
Error for reentrant client usage.
126
127
Raised when the client is used concurrently in a way that
128
violates the client's usage patterns or when attempting
129
to use the client in multiple contexts simultaneously.
130
"""
131
```
132
133
**Usage examples:**
134
135
```python
136
import asyncio
137
from aiomqtt import Client, MqttReentrantError
138
139
async def reentrant_error_example():
140
client = Client("test.mosquitto.org")
141
142
try:
143
# This would cause a reentrant error
144
async with client:
145
# Attempting to use the same client instance
146
# in multiple contexts simultaneously
147
async with client: # This will raise MqttReentrantError
148
await client.publish("test/topic", "message")
149
except MqttReentrantError as e:
150
print(f"Reentrant usage error: {e}")
151
print("Each client instance should only be used in one context")
152
153
async def proper_client_usage():
154
# Correct way: use one client per context
155
async with Client("test.mosquitto.org") as client1:
156
await client1.publish("test/topic1", "message1")
157
158
# Create a new client for another context
159
async with Client("test.mosquitto.org") as client2:
160
await client2.publish("test/topic2", "message2")
161
162
# Run examples
163
asyncio.run(reentrant_error_example())
164
asyncio.run(proper_client_usage())
165
```
166
167
### Comprehensive Error Handling Patterns
168
169
Best practices for handling different types of errors in MQTT applications.
170
171
**Usage examples:**
172
173
```python
174
import asyncio
175
import logging
176
from aiomqtt import (
177
Client,
178
MqttError,
179
MqttCodeError,
180
MqttReentrantError
181
)
182
183
# Configure logging
184
logging.basicConfig(level=logging.INFO)
185
logger = logging.getLogger(__name__)
186
187
async def comprehensive_error_handling():
188
"""Example of comprehensive error handling for MQTT operations."""
189
190
max_retries = 3
191
retry_delay = 5.0
192
193
for attempt in range(max_retries):
194
try:
195
async with Client(
196
"test.mosquitto.org",
197
username="test_user",
198
password="test_pass"
199
) as client:
200
# Subscribe with error handling
201
try:
202
await client.subscribe("sensors/+/data")
203
logger.info("Successfully subscribed")
204
except MqttCodeError as e:
205
logger.error(f"Subscription failed: {e}")
206
continue
207
208
# Publish with error handling
209
try:
210
await client.publish("status/online", "connected")
211
logger.info("Status published")
212
except MqttCodeError as e:
213
logger.error(f"Publish failed: {e}")
214
215
# Message processing with error handling
216
async for message in client.messages:
217
try:
218
await process_message(message)
219
except Exception as e:
220
logger.error(f"Message processing failed: {e}")
221
continue
222
223
break # Success, exit retry loop
224
225
226
except MqttReentrantError as e:
227
logger.error(f"Client usage error: {e}")
228
raise # Don't retry reentrant errors
229
230
except MqttCodeError as e:
231
logger.error(f"MQTT protocol error: {e}")
232
233
# Handle specific error codes
234
if e.rc == 4: # Bad credentials
235
logger.error("Authentication failed - check username/password")
236
raise # Don't retry auth errors
237
elif e.rc == 5: # Not authorized
238
logger.error("Authorization failed - insufficient permissions")
239
raise # Don't retry auth errors
240
else:
241
if attempt < max_retries - 1:
242
await asyncio.sleep(retry_delay)
243
else:
244
raise
245
246
except MqttError as e:
247
logger.error(f"General MQTT error: {e}")
248
if attempt < max_retries - 1:
249
await asyncio.sleep(retry_delay)
250
else:
251
raise
252
253
except Exception as e:
254
logger.error(f"Unexpected error: {e}")
255
raise
256
257
async def process_message(message):
258
"""Process received message with error handling."""
259
try:
260
# Simulate message processing
261
if message.payload == "error":
262
raise ValueError("Simulated processing error")
263
264
logger.info(f"Processed: {message.topic} = {message.payload}")
265
266
except ValueError as e:
267
logger.error(f"Message validation error: {e}")
268
raise
269
except Exception as e:
270
logger.error(f"Unexpected processing error: {e}")
271
raise
272
273
async def graceful_shutdown_example():
274
"""Example of graceful shutdown with error handling."""
275
client = None
276
try:
277
client = Client("test.mosquitto.org")
278
279
async with client:
280
await client.publish("status/online", "connected")
281
282
# Simulate work
283
await asyncio.sleep(1.0)
284
285
except KeyboardInterrupt:
286
logger.info("Shutdown requested by user")
287
except Exception as e:
288
logger.error(f"Error during operation: {e}")
289
finally:
290
# Client context manager handles cleanup automatically
291
logger.info("Cleanup completed")
292
293
# Run comprehensive example
294
if __name__ == "__main__":
295
try:
296
asyncio.run(comprehensive_error_handling())
297
except KeyboardInterrupt:
298
print("Application terminated by user")
299
except Exception as e:
300
print(f"Application error: {e}")
301
```
302
303
### Error Handling Best Practices
304
305
**1. Use specific exception types:**
306
```python
307
try:
308
async with Client("broker.com") as client:
309
await client.publish("topic", "message")
310
except MqttCodeError as e:
311
# Handle protocol errors with codes
312
print(f"Protocol error: {e.rc}")
313
# Check for connection-related error codes
314
if e.rc in [1, 2, 3, 4, 5]: # Connection refused codes
315
print("Connection-related error")
316
except MqttError:
317
# Handle other MQTT errors
318
pass
319
```
320
321
**2. Implement retry logic for transient errors:**
322
```python
323
async def publish_with_retry(client, topic, payload, max_retries=3):
324
for attempt in range(max_retries):
325
try:
326
await client.publish(topic, payload)
327
return # Success
328
except MqttCodeError as e:
329
if e.rc in [4, 5]: # Auth errors - don't retry
330
raise
331
if attempt == max_retries - 1:
332
raise
333
await asyncio.sleep(2 ** attempt) # Exponential backoff
334
```
335
336
**3. Log errors appropriately:**
337
```python
338
import logging
339
340
logger = logging.getLogger(__name__)
341
342
try:
343
async with Client("broker.com") as client:
344
await client.publish("topic", "message")
345
except MqttError as e:
346
logger.error("MQTT operation failed", exc_info=True)
347
# Handle error appropriately
348
```
349
350
**4. Clean up resources properly:**
351
```python
352
# Context manager handles cleanup automatically
353
async with Client("broker.com") as client:
354
# Client will be properly disconnected even if errors occur
355
await client.publish("topic", "message")
356
```