CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aiomqtt

The idiomatic asyncio MQTT client

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

error-handling.mddocs/

Error Handling

Structured exception hierarchy for MQTT errors, replacing callback-based error reporting with modern exception handling patterns.

Capabilities

Base MQTT Exception

The base exception class for all MQTT-related errors in aiomqtt.

class MqttError(Exception):
    """
    Base exception for all MQTT-related errors.
    
    This exception is the parent class for all MQTT-specific errors
    in aiomqtt, providing a common base for catching any MQTT-related
    exception.
    """

Usage example:

import asyncio
from aiomqtt import Client, MqttError

async def basic_error_handling():
    try:
        async with Client("invalid-broker-address") as client:
            await client.publish("test/topic", "message")
    except MqttError as e:
        print(f"MQTT error occurred: {e}")
    except Exception as e:
        print(f"Non-MQTT error: {e}")

asyncio.run(basic_error_handling())

MQTT Code Errors

Exception class for errors that include MQTT return codes or reason codes.

class MqttCodeError(MqttError):
    """
    MQTT error with return code or reason code information.
    
    This exception includes additional context about the specific
    MQTT error condition through return codes (MQTT v3.1.1) or
    reason codes (MQTT v5.0).
    """
    
    rc: int | ReasonCode | None
    
    def __str__(self) -> str:
        """
        Get formatted error message including return/reason code.
        
        Returns:
            str: Formatted error message with code information
        """

Common MQTT return codes:

  • 0: Success
  • 1: Unacceptable protocol version
  • 2: Identifier rejected
  • 3: Server unavailable
  • 4: Bad username or password
  • 5: Not authorized

Usage examples:

import asyncio
from aiomqtt import Client, MqttCodeError

async def code_error_handling():
    try:
        async with Client(
            "broker.example.com",
            username="invalid_user",
            password="wrong_password"
        ) as client:
            await client.publish("test/topic", "message")
    except MqttCodeError as e:
        print(f"MQTT error with code {e.rc}: {e}")
        
        # Handle specific error codes
        if e.rc == 4:  # Bad username or password
            print("Authentication failed - check credentials")
        elif e.rc == 5:  # Not authorized
            print("Authorization failed - insufficient permissions")
        else:
            print(f"Other MQTT error: {e.rc}")
    except Exception as e:
        print(f"Non-MQTT error: {e}")

async def publish_error_handling():
    try:
        async with Client("test.mosquitto.org") as client:
            # Try to publish to a topic that might be restricted
            await client.publish("$SYS/restricted", "should fail")
    except MqttCodeError as e:
        print(f"Publish failed with code {e.rc}: {e}")
        
        # Handle publish-specific errors
        if hasattr(e, 'rc') and e.rc:
            print(f"Broker rejected publish with reason: {e.rc}")
    except Exception as e:
        print(f"Unexpected error: {e}")

# Run examples
asyncio.run(code_error_handling())

Reentrant Usage Errors

Exception for improper concurrent usage of the client.

class MqttReentrantError(MqttError):
    """
    Error for reentrant client usage.
    
    Raised when the client is used concurrently in a way that
    violates the client's usage patterns or when attempting
    to use the client in multiple contexts simultaneously.
    """

Usage examples:

import asyncio
from aiomqtt import Client, MqttReentrantError

async def reentrant_error_example():
    client = Client("test.mosquitto.org")
    
    try:
        # This would cause a reentrant error
        async with client:
            # Attempting to use the same client instance
            # in multiple contexts simultaneously
            async with client:  # This will raise MqttReentrantError
                await client.publish("test/topic", "message")
    except MqttReentrantError as e:
        print(f"Reentrant usage error: {e}")
        print("Each client instance should only be used in one context")

async def proper_client_usage():
    # Correct way: use one client per context
    async with Client("test.mosquitto.org") as client1:
        await client1.publish("test/topic1", "message1")
    
    # Create a new client for another context
    async with Client("test.mosquitto.org") as client2:
        await client2.publish("test/topic2", "message2")

# Run examples
asyncio.run(reentrant_error_example())
asyncio.run(proper_client_usage())

Comprehensive Error Handling Patterns

Best practices for handling different types of errors in MQTT applications.

Usage examples:

import asyncio
import logging
from aiomqtt import (
    Client, 
    MqttError, 
    MqttCodeError, 
    MqttReentrantError
)

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def comprehensive_error_handling():
    """Example of comprehensive error handling for MQTT operations."""
    
    max_retries = 3
    retry_delay = 5.0
    
    for attempt in range(max_retries):
        try:
            async with Client(
                "test.mosquitto.org",
                username="test_user",
                password="test_pass"
            ) as client:
                # Subscribe with error handling
                try:
                    await client.subscribe("sensors/+/data")
                    logger.info("Successfully subscribed")
                except MqttCodeError as e:
                    logger.error(f"Subscription failed: {e}")
                    continue
                
                # Publish with error handling
                try:
                    await client.publish("status/online", "connected")
                    logger.info("Status published")
                except MqttCodeError as e:
                    logger.error(f"Publish failed: {e}")
                
                # Message processing with error handling
                async for message in client.messages:
                    try:
                        await process_message(message)
                    except Exception as e:
                        logger.error(f"Message processing failed: {e}")
                        continue
                
                break  # Success, exit retry loop
                
                
        except MqttReentrantError as e:
            logger.error(f"Client usage error: {e}")
            raise  # Don't retry reentrant errors
            
        except MqttCodeError as e:
            logger.error(f"MQTT protocol error: {e}")
            
            # Handle specific error codes
            if e.rc == 4:  # Bad credentials
                logger.error("Authentication failed - check username/password")
                raise  # Don't retry auth errors
            elif e.rc == 5:  # Not authorized
                logger.error("Authorization failed - insufficient permissions")
                raise  # Don't retry auth errors
            else:
                if attempt < max_retries - 1:
                    await asyncio.sleep(retry_delay)
                else:
                    raise
                    
        except MqttError as e:
            logger.error(f"General MQTT error: {e}")
            if attempt < max_retries - 1:
                await asyncio.sleep(retry_delay)
            else:
                raise
                
        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            raise

async def process_message(message):
    """Process received message with error handling."""
    try:
        # Simulate message processing
        if message.payload == "error":
            raise ValueError("Simulated processing error")
        
        logger.info(f"Processed: {message.topic} = {message.payload}")
        
    except ValueError as e:
        logger.error(f"Message validation error: {e}")
        raise
    except Exception as e:
        logger.error(f"Unexpected processing error: {e}")
        raise

async def graceful_shutdown_example():
    """Example of graceful shutdown with error handling."""
    client = None
    try:
        client = Client("test.mosquitto.org")
        
        async with client:
            await client.publish("status/online", "connected")
            
            # Simulate work
            await asyncio.sleep(1.0)
            
    except KeyboardInterrupt:
        logger.info("Shutdown requested by user")
    except Exception as e:
        logger.error(f"Error during operation: {e}")
    finally:
        # Client context manager handles cleanup automatically
        logger.info("Cleanup completed")

# Run comprehensive example
if __name__ == "__main__":
    try:
        asyncio.run(comprehensive_error_handling())
    except KeyboardInterrupt:
        print("Application terminated by user")
    except Exception as e:
        print(f"Application error: {e}")

Error Handling Best Practices

1. Use specific exception types:

try:
    async with Client("broker.com") as client:
        await client.publish("topic", "message")
except MqttCodeError as e:
    # Handle protocol errors with codes
    print(f"Protocol error: {e.rc}")
    # Check for connection-related error codes
    if e.rc in [1, 2, 3, 4, 5]:  # Connection refused codes
        print("Connection-related error")
except MqttError:
    # Handle other MQTT errors
    pass

2. Implement retry logic for transient errors:

async def publish_with_retry(client, topic, payload, max_retries=3):
    for attempt in range(max_retries):
        try:
            await client.publish(topic, payload)
            return  # Success
        except MqttCodeError as e:
            if e.rc in [4, 5]:  # Auth errors - don't retry
                raise
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # Exponential backoff

3. Log errors appropriately:

import logging

logger = logging.getLogger(__name__)

try:
    async with Client("broker.com") as client:
        await client.publish("topic", "message")
except MqttError as e:
    logger.error("MQTT operation failed", exc_info=True)
    # Handle error appropriately

4. Clean up resources properly:

# Context manager handles cleanup automatically
async with Client("broker.com") as client:
    # Client will be properly disconnected even if errors occur
    await client.publish("topic", "message")

Install with Tessl CLI

npx tessl i tessl/pypi-aiomqtt

docs

configuration-security.md

core-client.md

error-handling.md

index.md

message-handling.md

topic-management.md

tile.json