or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration-security.mdcore-client.mderror-handling.mdindex.mdmessage-handling.mdtopic-management.md

error-handling.mddocs/

0

# Error Handling

1

2

Structured exception hierarchy for MQTT errors, replacing callback-based error reporting with modern exception handling patterns.

3

4

## Capabilities

5

6

### Base MQTT Exception

7

8

The base exception class for all MQTT-related errors in aiomqtt.

9

10

```python { .api }

11

class MqttError(Exception):

12

"""

13

Base exception for all MQTT-related errors.

14

15

This exception is the parent class for all MQTT-specific errors

16

in aiomqtt, providing a common base for catching any MQTT-related

17

exception.

18

"""

19

```

20

21

**Usage example:**

22

23

```python

24

import asyncio

25

from aiomqtt import Client, MqttError

26

27

async def basic_error_handling():

28

try:

29

async with Client("invalid-broker-address") as client:

30

await client.publish("test/topic", "message")

31

except MqttError as e:

32

print(f"MQTT error occurred: {e}")

33

except Exception as e:

34

print(f"Non-MQTT error: {e}")

35

36

asyncio.run(basic_error_handling())

37

```

38

39

### MQTT Code Errors

40

41

Exception class for errors that include MQTT return codes or reason codes.

42

43

```python { .api }

44

class MqttCodeError(MqttError):

45

"""

46

MQTT error with return code or reason code information.

47

48

This exception includes additional context about the specific

49

MQTT error condition through return codes (MQTT v3.1.1) or

50

reason codes (MQTT v5.0).

51

"""

52

53

rc: int | ReasonCode | None

54

55

def __str__(self) -> str:

56

"""

57

Get formatted error message including return/reason code.

58

59

Returns:

60

str: Formatted error message with code information

61

"""

62

```

63

64

**Common MQTT return codes:**

65

- `0`: Success

66

- `1`: Unacceptable protocol version

67

- `2`: Identifier rejected

68

- `3`: Server unavailable

69

- `4`: Bad username or password

70

- `5`: Not authorized

71

72

**Usage examples:**

73

74

```python

75

import asyncio

76

from aiomqtt import Client, MqttCodeError

77

78

async def code_error_handling():

79

try:

80

async with Client(

81

"broker.example.com",

82

username="invalid_user",

83

password="wrong_password"

84

) as client:

85

await client.publish("test/topic", "message")

86

except MqttCodeError as e:

87

print(f"MQTT error with code {e.rc}: {e}")

88

89

# Handle specific error codes

90

if e.rc == 4: # Bad username or password

91

print("Authentication failed - check credentials")

92

elif e.rc == 5: # Not authorized

93

print("Authorization failed - insufficient permissions")

94

else:

95

print(f"Other MQTT error: {e.rc}")

96

except Exception as e:

97

print(f"Non-MQTT error: {e}")

98

99

async def publish_error_handling():

100

try:

101

async with Client("test.mosquitto.org") as client:

102

# Try to publish to a topic that might be restricted

103

await client.publish("$SYS/restricted", "should fail")

104

except MqttCodeError as e:

105

print(f"Publish failed with code {e.rc}: {e}")

106

107

# Handle publish-specific errors

108

if hasattr(e, 'rc') and e.rc:

109

print(f"Broker rejected publish with reason: {e.rc}")

110

except Exception as e:

111

print(f"Unexpected error: {e}")

112

113

# Run examples

114

asyncio.run(code_error_handling())

115

```

116

117

118

### Reentrant Usage Errors

119

120

Exception for improper concurrent usage of the client.

121

122

```python { .api }

123

class MqttReentrantError(MqttError):

124

"""

125

Error for reentrant client usage.

126

127

Raised when the client is used concurrently in a way that

128

violates the client's usage patterns or when attempting

129

to use the client in multiple contexts simultaneously.

130

"""

131

```

132

133

**Usage examples:**

134

135

```python

136

import asyncio

137

from aiomqtt import Client, MqttReentrantError

138

139

async def reentrant_error_example():

140

client = Client("test.mosquitto.org")

141

142

try:

143

# This would cause a reentrant error

144

async with client:

145

# Attempting to use the same client instance

146

# in multiple contexts simultaneously

147

async with client: # This will raise MqttReentrantError

148

await client.publish("test/topic", "message")

149

except MqttReentrantError as e:

150

print(f"Reentrant usage error: {e}")

151

print("Each client instance should only be used in one context")

152

153

async def proper_client_usage():

154

# Correct way: use one client per context

155

async with Client("test.mosquitto.org") as client1:

156

await client1.publish("test/topic1", "message1")

157

158

# Create a new client for another context

159

async with Client("test.mosquitto.org") as client2:

160

await client2.publish("test/topic2", "message2")

161

162

# Run examples

163

asyncio.run(reentrant_error_example())

164

asyncio.run(proper_client_usage())

165

```

166

167

### Comprehensive Error Handling Patterns

168

169

Best practices for handling different types of errors in MQTT applications.

170

171

**Usage examples:**

172

173

```python

174

import asyncio

175

import logging

176

from aiomqtt import (

177

Client,

178

MqttError,

179

MqttCodeError,

180

MqttReentrantError

181

)

182

183

# Configure logging

184

logging.basicConfig(level=logging.INFO)

185

logger = logging.getLogger(__name__)

186

187

async def comprehensive_error_handling():

188

"""Example of comprehensive error handling for MQTT operations."""

189

190

max_retries = 3

191

retry_delay = 5.0

192

193

for attempt in range(max_retries):

194

try:

195

async with Client(

196

"test.mosquitto.org",

197

username="test_user",

198

password="test_pass"

199

) as client:

200

# Subscribe with error handling

201

try:

202

await client.subscribe("sensors/+/data")

203

logger.info("Successfully subscribed")

204

except MqttCodeError as e:

205

logger.error(f"Subscription failed: {e}")

206

continue

207

208

# Publish with error handling

209

try:

210

await client.publish("status/online", "connected")

211

logger.info("Status published")

212

except MqttCodeError as e:

213

logger.error(f"Publish failed: {e}")

214

215

# Message processing with error handling

216

async for message in client.messages:

217

try:

218

await process_message(message)

219

except Exception as e:

220

logger.error(f"Message processing failed: {e}")

221

continue

222

223

break # Success, exit retry loop

224

225

226

except MqttReentrantError as e:

227

logger.error(f"Client usage error: {e}")

228

raise # Don't retry reentrant errors

229

230

except MqttCodeError as e:

231

logger.error(f"MQTT protocol error: {e}")

232

233

# Handle specific error codes

234

if e.rc == 4: # Bad credentials

235

logger.error("Authentication failed - check username/password")

236

raise # Don't retry auth errors

237

elif e.rc == 5: # Not authorized

238

logger.error("Authorization failed - insufficient permissions")

239

raise # Don't retry auth errors

240

else:

241

if attempt < max_retries - 1:

242

await asyncio.sleep(retry_delay)

243

else:

244

raise

245

246

except MqttError as e:

247

logger.error(f"General MQTT error: {e}")

248

if attempt < max_retries - 1:

249

await asyncio.sleep(retry_delay)

250

else:

251

raise

252

253

except Exception as e:

254

logger.error(f"Unexpected error: {e}")

255

raise

256

257

async def process_message(message):

258

"""Process received message with error handling."""

259

try:

260

# Simulate message processing

261

if message.payload == "error":

262

raise ValueError("Simulated processing error")

263

264

logger.info(f"Processed: {message.topic} = {message.payload}")

265

266

except ValueError as e:

267

logger.error(f"Message validation error: {e}")

268

raise

269

except Exception as e:

270

logger.error(f"Unexpected processing error: {e}")

271

raise

272

273

async def graceful_shutdown_example():

274

"""Example of graceful shutdown with error handling."""

275

client = None

276

try:

277

client = Client("test.mosquitto.org")

278

279

async with client:

280

await client.publish("status/online", "connected")

281

282

# Simulate work

283

await asyncio.sleep(1.0)

284

285

except KeyboardInterrupt:

286

logger.info("Shutdown requested by user")

287

except Exception as e:

288

logger.error(f"Error during operation: {e}")

289

finally:

290

# Client context manager handles cleanup automatically

291

logger.info("Cleanup completed")

292

293

# Run comprehensive example

294

if __name__ == "__main__":

295

try:

296

asyncio.run(comprehensive_error_handling())

297

except KeyboardInterrupt:

298

print("Application terminated by user")

299

except Exception as e:

300

print(f"Application error: {e}")

301

```

302

303

### Error Handling Best Practices

304

305

**1. Use specific exception types:**

306

```python

307

try:

308

async with Client("broker.com") as client:

309

await client.publish("topic", "message")

310

except MqttCodeError as e:

311

# Handle protocol errors with codes

312

print(f"Protocol error: {e.rc}")

313

# Check for connection-related error codes

314

if e.rc in [1, 2, 3, 4, 5]: # Connection refused codes

315

print("Connection-related error")

316

except MqttError:

317

# Handle other MQTT errors

318

pass

319

```

320

321

**2. Implement retry logic for transient errors:**

322

```python

323

async def publish_with_retry(client, topic, payload, max_retries=3):

324

for attempt in range(max_retries):

325

try:

326

await client.publish(topic, payload)

327

return # Success

328

except MqttCodeError as e:

329

if e.rc in [4, 5]: # Auth errors - don't retry

330

raise

331

if attempt == max_retries - 1:

332

raise

333

await asyncio.sleep(2 ** attempt) # Exponential backoff

334

```

335

336

**3. Log errors appropriately:**

337

```python

338

import logging

339

340

logger = logging.getLogger(__name__)

341

342

try:

343

async with Client("broker.com") as client:

344

await client.publish("topic", "message")

345

except MqttError as e:

346

logger.error("MQTT operation failed", exc_info=True)

347

# Handle error appropriately

348

```

349

350

**4. Clean up resources properly:**

351

```python

352

# Context manager handles cleanup automatically

353

async with Client("broker.com") as client:

354

# Client will be properly disconnected even if errors occur

355

await client.publish("topic", "message")

356

```