The idiomatic asyncio MQTT client
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Structured exception hierarchy for MQTT errors, replacing callback-based error reporting with modern exception handling patterns.
The base exception class for all MQTT-related errors in aiomqtt.
class MqttError(Exception):
"""
Base exception for all MQTT-related errors.
This exception is the parent class for all MQTT-specific errors
in aiomqtt, providing a common base for catching any MQTT-related
exception.
"""Usage example:
import asyncio
from aiomqtt import Client, MqttError
async def basic_error_handling():
try:
async with Client("invalid-broker-address") as client:
await client.publish("test/topic", "message")
except MqttError as e:
print(f"MQTT error occurred: {e}")
except Exception as e:
print(f"Non-MQTT error: {e}")
asyncio.run(basic_error_handling())Exception class for errors that include MQTT return codes or reason codes.
class MqttCodeError(MqttError):
"""
MQTT error with return code or reason code information.
This exception includes additional context about the specific
MQTT error condition through return codes (MQTT v3.1.1) or
reason codes (MQTT v5.0).
"""
rc: int | ReasonCode | None
def __str__(self) -> str:
"""
Get formatted error message including return/reason code.
Returns:
str: Formatted error message with code information
"""Common MQTT return codes:
0: Success1: Unacceptable protocol version2: Identifier rejected3: Server unavailable4: Bad username or password5: Not authorizedUsage examples:
import asyncio
from aiomqtt import Client, MqttCodeError
async def code_error_handling():
try:
async with Client(
"broker.example.com",
username="invalid_user",
password="wrong_password"
) as client:
await client.publish("test/topic", "message")
except MqttCodeError as e:
print(f"MQTT error with code {e.rc}: {e}")
# Handle specific error codes
if e.rc == 4: # Bad username or password
print("Authentication failed - check credentials")
elif e.rc == 5: # Not authorized
print("Authorization failed - insufficient permissions")
else:
print(f"Other MQTT error: {e.rc}")
except Exception as e:
print(f"Non-MQTT error: {e}")
async def publish_error_handling():
try:
async with Client("test.mosquitto.org") as client:
# Try to publish to a topic that might be restricted
await client.publish("$SYS/restricted", "should fail")
except MqttCodeError as e:
print(f"Publish failed with code {e.rc}: {e}")
# Handle publish-specific errors
if hasattr(e, 'rc') and e.rc:
print(f"Broker rejected publish with reason: {e.rc}")
except Exception as e:
print(f"Unexpected error: {e}")
# Run examples
asyncio.run(code_error_handling())Exception for improper concurrent usage of the client.
class MqttReentrantError(MqttError):
"""
Error for reentrant client usage.
Raised when the client is used concurrently in a way that
violates the client's usage patterns or when attempting
to use the client in multiple contexts simultaneously.
"""Usage examples:
import asyncio
from aiomqtt import Client, MqttReentrantError
async def reentrant_error_example():
client = Client("test.mosquitto.org")
try:
# This would cause a reentrant error
async with client:
# Attempting to use the same client instance
# in multiple contexts simultaneously
async with client: # This will raise MqttReentrantError
await client.publish("test/topic", "message")
except MqttReentrantError as e:
print(f"Reentrant usage error: {e}")
print("Each client instance should only be used in one context")
async def proper_client_usage():
# Correct way: use one client per context
async with Client("test.mosquitto.org") as client1:
await client1.publish("test/topic1", "message1")
# Create a new client for another context
async with Client("test.mosquitto.org") as client2:
await client2.publish("test/topic2", "message2")
# Run examples
asyncio.run(reentrant_error_example())
asyncio.run(proper_client_usage())Best practices for handling different types of errors in MQTT applications.
Usage examples:
import asyncio
import logging
from aiomqtt import (
Client,
MqttError,
MqttCodeError,
MqttReentrantError
)
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def comprehensive_error_handling():
"""Example of comprehensive error handling for MQTT operations."""
max_retries = 3
retry_delay = 5.0
for attempt in range(max_retries):
try:
async with Client(
"test.mosquitto.org",
username="test_user",
password="test_pass"
) as client:
# Subscribe with error handling
try:
await client.subscribe("sensors/+/data")
logger.info("Successfully subscribed")
except MqttCodeError as e:
logger.error(f"Subscription failed: {e}")
continue
# Publish with error handling
try:
await client.publish("status/online", "connected")
logger.info("Status published")
except MqttCodeError as e:
logger.error(f"Publish failed: {e}")
# Message processing with error handling
async for message in client.messages:
try:
await process_message(message)
except Exception as e:
logger.error(f"Message processing failed: {e}")
continue
break # Success, exit retry loop
except MqttReentrantError as e:
logger.error(f"Client usage error: {e}")
raise # Don't retry reentrant errors
except MqttCodeError as e:
logger.error(f"MQTT protocol error: {e}")
# Handle specific error codes
if e.rc == 4: # Bad credentials
logger.error("Authentication failed - check username/password")
raise # Don't retry auth errors
elif e.rc == 5: # Not authorized
logger.error("Authorization failed - insufficient permissions")
raise # Don't retry auth errors
else:
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
else:
raise
except MqttError as e:
logger.error(f"General MQTT error: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(retry_delay)
else:
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
async def process_message(message):
"""Process received message with error handling."""
try:
# Simulate message processing
if message.payload == "error":
raise ValueError("Simulated processing error")
logger.info(f"Processed: {message.topic} = {message.payload}")
except ValueError as e:
logger.error(f"Message validation error: {e}")
raise
except Exception as e:
logger.error(f"Unexpected processing error: {e}")
raise
async def graceful_shutdown_example():
"""Example of graceful shutdown with error handling."""
client = None
try:
client = Client("test.mosquitto.org")
async with client:
await client.publish("status/online", "connected")
# Simulate work
await asyncio.sleep(1.0)
except KeyboardInterrupt:
logger.info("Shutdown requested by user")
except Exception as e:
logger.error(f"Error during operation: {e}")
finally:
# Client context manager handles cleanup automatically
logger.info("Cleanup completed")
# Run comprehensive example
if __name__ == "__main__":
try:
asyncio.run(comprehensive_error_handling())
except KeyboardInterrupt:
print("Application terminated by user")
except Exception as e:
print(f"Application error: {e}")1. Use specific exception types:
try:
async with Client("broker.com") as client:
await client.publish("topic", "message")
except MqttCodeError as e:
# Handle protocol errors with codes
print(f"Protocol error: {e.rc}")
# Check for connection-related error codes
if e.rc in [1, 2, 3, 4, 5]: # Connection refused codes
print("Connection-related error")
except MqttError:
# Handle other MQTT errors
pass2. Implement retry logic for transient errors:
async def publish_with_retry(client, topic, payload, max_retries=3):
for attempt in range(max_retries):
try:
await client.publish(topic, payload)
return # Success
except MqttCodeError as e:
if e.rc in [4, 5]: # Auth errors - don't retry
raise
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff3. Log errors appropriately:
import logging
logger = logging.getLogger(__name__)
try:
async with Client("broker.com") as client:
await client.publish("topic", "message")
except MqttError as e:
logger.error("MQTT operation failed", exc_info=True)
# Handle error appropriately4. Clean up resources properly:
# Context manager handles cleanup automatically
async with Client("broker.com") as client:
# Client will be properly disconnected even if errors occur
await client.publish("topic", "message")Install with Tessl CLI
npx tessl i tessl/pypi-aiomqtt