CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-awsiotpythonsdk

SDK for connecting to AWS IoT using Python.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

exception-handling.mddocs/

Exception Handling

Comprehensive exception classes for timeout conditions, operation errors, connection failures, and Greengrass discovery issues. The AWS IoT Python SDK provides specific exceptions for different failure modes, enabling robust error handling and recovery strategies in IoT applications.

Capabilities

Base Exception Classes

Foundation exception classes that provide common error handling patterns.

class operationError(Exception):
    """
    Base class for operation-related errors in AWS IoT operations.
    Raised when MQTT operations fail due to protocol or client issues.
    """

class operationTimeoutException(Exception):
    """
    Base class for timeout-related errors in AWS IoT operations.
    Raised when operations exceed configured timeout periods.
    """

class acceptTimeoutException(Exception):
    """
    Raised when socket accept operation times out.
    Used in low-level connection establishment scenarios.
    """

Connection Exceptions

Exceptions related to MQTT connection establishment and management.

class connectTimeoutException(operationTimeoutException):
    """Raised when MQTT connection attempt times out"""

class connectError(operationError):
    """Raised when MQTT connection fails due to protocol or authentication errors"""

class disconnectTimeoutException(operationTimeoutException):  
    """Raised when MQTT disconnection attempt times out"""

class disconnectError(operationError):
    """Raised when MQTT disconnection fails"""

Publish Operation Exceptions

Exceptions specific to MQTT publish operations and queue management.

class publishTimeoutException(operationTimeoutException):
    """Raised when MQTT publish operation times out (QoS 1)"""

class publishError(operationError):
    """Raised when MQTT publish operation fails"""

class publishQueueFullException(operationError):
    """Raised when offline publish queue is full and configured to reject new messages"""

class publishQueueDisabledException(operationError):
    """Raised when attempting to queue publish messages but queueing is disabled"""

Subscribe Operation Exceptions

Exceptions for MQTT subscription operations and acknowledgment handling.

class subscribeTimeoutException(operationTimeoutException):
    """Raised when MQTT subscribe operation times out"""

class subscribeError(operationError):
    """Raised when MQTT subscribe operation fails"""

class subackError(operationError):
    """Raised when MQTT SUBACK indicates subscription failure"""

class subscribeQueueFullException(operationError):
    """Raised when offline subscribe queue is full"""

class subscribeQueueDisabledException(operationError):
    """Raised when attempting to queue subscribe operations but queueing is disabled"""

Unsubscribe Operation Exceptions

Exceptions for MQTT unsubscribe operations.

class unsubscribeTimeoutException(operationTimeoutException):
    """Raised when MQTT unsubscribe operation times out"""

class unsubscribeError(operationError):
    """Raised when MQTT unsubscribe operation fails"""

class unsubscribeQueueFullException(operationError):
    """Raised when offline unsubscribe queue is full"""

class unsubscribeQueueDisabledException(operationError):
    """Raised when attempting to queue unsubscribe operations but queueing is disabled"""

WebSocket Exceptions

Exceptions specific to WebSocket SigV4 authentication and connection.

class wssNoKeyInEnvironmentError(Exception):
    """Raised when WebSocket SigV4 credentials are missing from environment"""

class wssHandShakeError(Exception):
    """Raised when WebSocket handshake fails during connection establishment"""

Greengrass Discovery Exceptions

Exceptions for Greengrass core discovery operations.

class DiscoveryTimeoutException(Exception):
    """Raised when Greengrass discovery request times out"""

class DiscoveryUnauthorizedException(Exception):
    """Raised when Greengrass discovery authentication fails"""

class DiscoveryDataNotFoundException(Exception):
    """Raised when no Greengrass discovery data found for the specified thing"""

class DiscoveryInvalidRequestException(Exception):
    """Raised when Greengrass discovery request is malformed or invalid"""

class DiscoveryThrottlingException(Exception):
    """Raised when Greengrass discovery requests are being throttled"""

class DiscoveryFailure(Exception):
    """Raised for general Greengrass discovery service failures"""

General Client Exceptions

General-purpose client error for miscellaneous failures.

class ClientError(Exception):
    """General client error for SDK-wide failures and misconfigurations"""

Usage Examples

Basic Exception Handling

import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
from AWSIoTPythonSDK.exception.AWSIoTExceptions import (
    connectTimeoutException,
    connectError,
    publishTimeoutException,
    subscribeTimeoutException
)

# Create and configure client
client = AWSIoTPyMQTT.AWSIoTMQTTClient("errorHandlingClient")
client.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
client.configureCredentials("rootCA.crt", "private.key", "certificate.crt")

# Configure timeouts
client.configureConnectDisconnectTimeout(10)
client.configureMQTTOperationTimeout(5)

try:
    # Attempt connection with timeout handling
    if not client.connect():
        raise connectError("Failed to establish MQTT connection")
    
    print("Connected successfully")
    
    # Attempt publish with timeout handling
    if not client.publish("test/topic", "Hello World", 1):
        raise publishTimeoutException("Publish operation failed")
    
    print("Message published successfully")
    
    # Attempt subscribe with timeout handling
    def messageCallback(client, userdata, message):
        print(f"Received: {message.payload.decode()}")
    
    if not client.subscribe("test/response", 1, messageCallback):
        raise subscribeTimeoutException("Subscribe operation failed")
    
    print("Subscribed successfully")

except connectTimeoutException:
    print("Connection timed out - check network connectivity and endpoint")
    
except connectError:
    print("Connection failed - check credentials and endpoint configuration")
    
except publishTimeoutException:
    print("Publish timed out - message may not have been delivered")
    
except subscribeTimeoutException:
    print("Subscribe timed out - subscription may not be active")
    
except Exception as e:
    print(f"Unexpected error: {e}")
    
finally:
    try:
        client.disconnect()
    except:
        pass  # Ignore disconnect errors in cleanup

Queue Exception Handling

import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
from AWSIoTPythonSDK.exception.AWSIoTExceptions import (
    publishQueueFullException,
    publishQueueDisabledException
)

# Create client with limited queue
client = AWSIoTPyMQTT.AWSIoTMQTTClient("queueClient")
client.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
client.configureCredentials("rootCA.crt", "private.key", "certificate.crt")

# Configure small queue for demonstration
client.configureOfflinePublishQueueing(5, AWSIoTPyMQTT.DROP_NEWEST)

try:
    client.connect()
    
    # Simulate offline condition by disconnecting
    client.disconnect()
    
    # Attempt to publish while offline
    for i in range(10):  # More messages than queue capacity
        try:
            message = f"Offline message {i}"
            if not client.publish("offline/topic", message, 1):
                print(f"Message {i} queued for offline delivery")
            
        except publishQueueFullException:
            print(f"Queue full - message {i} rejected")
            # Implement retry logic or alternative handling
            
        except publishQueueDisabledException:
            print("Queueing is disabled - reconfigure client for offline operation")
            break

except Exception as e:
    print(f"Queue operation failed: {e}")

Greengrass Discovery Exception Handling

from AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider
from AWSIoTPythonSDK.exception.AWSIoTExceptions import (
    DiscoveryTimeoutException,
    DiscoveryUnauthorizedException,
    DiscoveryDataNotFoundException,
    DiscoveryThrottlingException,
    DiscoveryFailure
)
import time

def robust_discovery(thing_name, max_retries=3):
    """Perform Greengrass discovery with comprehensive error handling"""
    
    discoveryProvider = DiscoveryInfoProvider()
    discoveryProvider.configureEndpoint("greengrass-ats.iot.region.amazonaws.com")
    discoveryProvider.configureCredentials("rootCA.crt", "certificate.crt", "private.key")
    discoveryProvider.configureTimeout(30)
    
    for attempt in range(max_retries):
        try:
            print(f"Discovery attempt {attempt + 1}")
            
            discoveryInfo = discoveryProvider.discover(thing_name)
            print("Discovery successful")
            return discoveryInfo
            
        except DiscoveryTimeoutException:
            print("Discovery timed out - check network connectivity")
            if attempt < max_retries - 1:
                print("Retrying with longer timeout...")
                discoveryProvider.configureTimeout(60)  # Increase timeout
                time.sleep(5)
                
        except DiscoveryUnauthorizedException:
            print("Discovery authentication failed")
            print("Check certificate validity and thing permissions")
            return None  # Don't retry auth failures
            
        except DiscoveryDataNotFoundException:
            print(f"No discovery data for thing: {thing_name}")
            print("Ensure thing is associated with a Greengrass group")
            return None  # Don't retry data not found
            
        except DiscoveryThrottlingException:
            print("Discovery requests are being throttled")
            if attempt < max_retries - 1:
                backoff_time = 2 ** attempt  # Exponential backoff
                print(f"Backing off for {backoff_time} seconds...")
                time.sleep(backoff_time)
                
        except DiscoveryFailure as e:
            print(f"Discovery service failure: {e}")
            if attempt < max_retries - 1:
                print("Retrying discovery request...")
                time.sleep(2)
                
        except Exception as e:
            print(f"Unexpected discovery error: {e}")
            if attempt < max_retries - 1:
                time.sleep(2)
    
    print(f"Discovery failed after {max_retries} attempts")
    return None

# Usage
discovery_result = robust_discovery("myIoTDevice")
if discovery_result:
    cores = discovery_result.getAllCores()
    print(f"Found {len(cores)} Greengrass cores")
else:
    print("Discovery failed - falling back to direct AWS IoT connection")

Comprehensive Error Recovery

import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
from AWSIoTPythonSDK.exception.AWSIoTExceptions import *
import time
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class RobustIoTClient:
    """IoT client with comprehensive error handling and recovery"""
    
    def __init__(self, client_id, endpoint, port=8883):
        self.client_id = client_id
        self.endpoint = endpoint  
        self.port = port
        self.client = None
        self.connected = False
        self.max_retries = 3
        self.retry_delay = 5
        
    def configure_credentials(self, ca_path, key_path, cert_path):
        """Configure client credentials"""
        self.ca_path = ca_path
        self.key_path = key_path
        self.cert_path = cert_path
        
    def create_client(self):
        """Create and configure MQTT client"""
        self.client = AWSIoTPyMQTT.AWSIoTMQTTClient(self.client_id)
        self.client.configureEndpoint(self.endpoint, self.port)
        self.client.configureCredentials(self.ca_path, self.key_path, self.cert_path)
        
        # Configure timeouts and reconnection  
        self.client.configureConnectDisconnectTimeout(10)
        self.client.configureMQTTOperationTimeout(5)
        self.client.configureAutoReconnectBackoffTime(1, 32, 20)
        self.client.configureOfflinePublishQueueing(-1)  # Infinite queue
        
    def connect_with_retry(self):
        """Connect with automatic retry and error handling"""
        for attempt in range(self.max_retries):
            try:
                logger.info(f"Connection attempt {attempt + 1}")
                
                if self.client.connect():
                    self.connected = True
                    logger.info("Connected successfully")
                    return True
                else:
                    raise connectError("Connection returned False")
                    
            except connectTimeoutException:
                logger.warning(f"Connection timeout on attempt {attempt + 1}")
                
            except connectError as e:
                logger.error(f"Connection error: {e}")
                
            except Exception as e:
                logger.error(f"Unexpected connection error: {e}")
                
            if attempt < self.max_retries - 1:
                logger.info(f"Retrying in {self.retry_delay} seconds...")
                time.sleep(self.retry_delay)
        
        logger.error("Failed to connect after all retries")
        return False
        
    def publish_with_retry(self, topic, payload, qos=0):
        """Publish with error handling and retry logic"""
        if not self.connected:
            logger.warning("Not connected - attempting reconnection")
            if not self.connect_with_retry():
                return False
                
        for attempt in range(self.max_retries):
            try:
                if self.client.publish(topic, payload, qos):
                    logger.info(f"Published to {topic}")
                    return True
                else:
                    raise publishError("Publish returned False")
                    
            except publishTimeoutException:
                logger.warning(f"Publish timeout on attempt {attempt + 1}")
                
            except publishQueueFullException:
                logger.warning("Publish queue full - clearing some messages")
                # Could implement queue management logic here
                
            except publishError as e:
                logger.error(f"Publish error: {e}")
                
            except Exception as e:
                logger.error(f"Unexpected publish error: {e}")
                
            if attempt < self.max_retries - 1:
                time.sleep(1)
        
        logger.error(f"Failed to publish to {topic} after all retries")
        return False
        
    def subscribe_with_retry(self, topic, qos, callback):
        """Subscribe with error handling"""
        if not self.connected:
            if not self.connect_with_retry():
                return False
                
        try:
            if self.client.subscribe(topic, qos, callback):
                logger.info(f"Subscribed to {topic}")
                return True
            else:
                raise subscribeError("Subscribe returned False")
                
        except subscribeTimeoutException:
            logger.error("Subscribe operation timed out")
            
        except subackError:
            logger.error("Subscribe was rejected by broker")
            
        except subscribeError as e:
            logger.error(f"Subscribe error: {e}")
            
        except Exception as e:
            logger.error(f"Unexpected subscribe error: {e}")
            
        return False
        
    def disconnect_safely(self):
        """Disconnect with error handling"""
        if not self.connected:
            return
            
        try:
            self.client.disconnect()
            self.connected = False
            logger.info("Disconnected successfully")
            
        except disconnectTimeoutException:
            logger.warning("Disconnect timed out")
            
        except disconnectError as e:
            logger.error(f"Disconnect error: {e}")
            
        except Exception as e:
            logger.error(f"Unexpected disconnect error: {e}")

# Usage example
client = RobustIoTClient("robustClient", "endpoint.iot.region.amazonaws.com")
client.configure_credentials("rootCA.crt", "private.key", "certificate.crt")
client.create_client()

if client.connect_with_retry():
    # Publish with error handling
    client.publish_with_retry("test/topic", "Hello World", 1)
    
    # Subscribe with error handling
    def message_callback(client, userdata, message):
        print(f"Received: {message.payload.decode()}")
    
    client.subscribe_with_retry("test/response", 1, message_callback)
    
    # Keep running
    try:
        time.sleep(30)
    finally:
        client.disconnect_safely()

Types

# Exception hierarchy structure
Exception
├── acceptTimeoutException
├── operationError
│   ├── connectError
│   ├── disconnectError  
│   ├── publishError
│   ├── publishQueueFullException
│   ├── publishQueueDisabledException
│   ├── subscribeError
│   ├── subackError
│   ├── subscribeQueueFullException
│   ├── subscribeQueueDisabledException
│   ├── unsubscribeError
│   ├── unsubscribeQueueFullException
│   └── unsubscribeQueueDisabledException
├── operationTimeoutException
│   ├── connectTimeoutException
│   ├── disconnectTimeoutException
│   ├── publishTimeoutException
│   ├── subscribeTimeoutException
│   └── unsubscribeTimeoutException
├── wssNoKeyInEnvironmentError
├── wssHandShakeError
├── DiscoveryTimeoutException
├── DiscoveryUnauthorizedException
├── DiscoveryDataNotFoundException
├── DiscoveryInvalidRequestException
├── DiscoveryThrottlingException
├── DiscoveryFailure
└── ClientError

# Exception handling best practices
error_handling_patterns = {
    "timeout_exceptions": "Retry with backoff, check network connectivity",
    "auth_exceptions": "Don't retry, check credentials and permissions", 
    "queue_exceptions": "Implement queue management or fallback strategies",
    "discovery_exceptions": "Use exponential backoff, fallback to direct connection",
    "general_errors": "Log error details, implement graceful degradation"
}

Install with Tessl CLI

npx tessl i tessl/pypi-awsiotpythonsdk

docs

device-shadows.md

exception-handling.md

greengrass-discovery.md

index.md

iot-jobs.md

mqtt-client.md

tile.json