SDK for connecting to AWS IoT using Python.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive exception classes for timeout conditions, operation errors, connection failures, and Greengrass discovery issues. The AWS IoT Python SDK provides specific exceptions for different failure modes, enabling robust error handling and recovery strategies in IoT applications.
Foundation exception classes that provide common error handling patterns.
class operationError(Exception):
"""
Base class for operation-related errors in AWS IoT operations.
Raised when MQTT operations fail due to protocol or client issues.
"""
class operationTimeoutException(Exception):
"""
Base class for timeout-related errors in AWS IoT operations.
Raised when operations exceed configured timeout periods.
"""
class acceptTimeoutException(Exception):
"""
Raised when socket accept operation times out.
Used in low-level connection establishment scenarios.
"""Exceptions related to MQTT connection establishment and management.
class connectTimeoutException(operationTimeoutException):
"""Raised when MQTT connection attempt times out"""
class connectError(operationError):
"""Raised when MQTT connection fails due to protocol or authentication errors"""
class disconnectTimeoutException(operationTimeoutException):
"""Raised when MQTT disconnection attempt times out"""
class disconnectError(operationError):
"""Raised when MQTT disconnection fails"""Exceptions specific to MQTT publish operations and queue management.
class publishTimeoutException(operationTimeoutException):
"""Raised when MQTT publish operation times out (QoS 1)"""
class publishError(operationError):
"""Raised when MQTT publish operation fails"""
class publishQueueFullException(operationError):
"""Raised when offline publish queue is full and configured to reject new messages"""
class publishQueueDisabledException(operationError):
"""Raised when attempting to queue publish messages but queueing is disabled"""Exceptions for MQTT subscription operations and acknowledgment handling.
class subscribeTimeoutException(operationTimeoutException):
"""Raised when MQTT subscribe operation times out"""
class subscribeError(operationError):
"""Raised when MQTT subscribe operation fails"""
class subackError(operationError):
"""Raised when MQTT SUBACK indicates subscription failure"""
class subscribeQueueFullException(operationError):
"""Raised when offline subscribe queue is full"""
class subscribeQueueDisabledException(operationError):
"""Raised when attempting to queue subscribe operations but queueing is disabled"""Exceptions for MQTT unsubscribe operations.
class unsubscribeTimeoutException(operationTimeoutException):
"""Raised when MQTT unsubscribe operation times out"""
class unsubscribeError(operationError):
"""Raised when MQTT unsubscribe operation fails"""
class unsubscribeQueueFullException(operationError):
"""Raised when offline unsubscribe queue is full"""
class unsubscribeQueueDisabledException(operationError):
"""Raised when attempting to queue unsubscribe operations but queueing is disabled"""Exceptions specific to WebSocket SigV4 authentication and connection.
class wssNoKeyInEnvironmentError(Exception):
"""Raised when WebSocket SigV4 credentials are missing from environment"""
class wssHandShakeError(Exception):
"""Raised when WebSocket handshake fails during connection establishment"""Exceptions for Greengrass core discovery operations.
class DiscoveryTimeoutException(Exception):
"""Raised when Greengrass discovery request times out"""
class DiscoveryUnauthorizedException(Exception):
"""Raised when Greengrass discovery authentication fails"""
class DiscoveryDataNotFoundException(Exception):
"""Raised when no Greengrass discovery data found for the specified thing"""
class DiscoveryInvalidRequestException(Exception):
"""Raised when Greengrass discovery request is malformed or invalid"""
class DiscoveryThrottlingException(Exception):
"""Raised when Greengrass discovery requests are being throttled"""
class DiscoveryFailure(Exception):
"""Raised for general Greengrass discovery service failures"""General-purpose client error for miscellaneous failures.
class ClientError(Exception):
"""General client error for SDK-wide failures and misconfigurations"""import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
from AWSIoTPythonSDK.exception.AWSIoTExceptions import (
connectTimeoutException,
connectError,
publishTimeoutException,
subscribeTimeoutException
)
# Create and configure client
client = AWSIoTPyMQTT.AWSIoTMQTTClient("errorHandlingClient")
client.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
client.configureCredentials("rootCA.crt", "private.key", "certificate.crt")
# Configure timeouts
client.configureConnectDisconnectTimeout(10)
client.configureMQTTOperationTimeout(5)
try:
# Attempt connection with timeout handling
if not client.connect():
raise connectError("Failed to establish MQTT connection")
print("Connected successfully")
# Attempt publish with timeout handling
if not client.publish("test/topic", "Hello World", 1):
raise publishTimeoutException("Publish operation failed")
print("Message published successfully")
# Attempt subscribe with timeout handling
def messageCallback(client, userdata, message):
print(f"Received: {message.payload.decode()}")
if not client.subscribe("test/response", 1, messageCallback):
raise subscribeTimeoutException("Subscribe operation failed")
print("Subscribed successfully")
except connectTimeoutException:
print("Connection timed out - check network connectivity and endpoint")
except connectError:
print("Connection failed - check credentials and endpoint configuration")
except publishTimeoutException:
print("Publish timed out - message may not have been delivered")
except subscribeTimeoutException:
print("Subscribe timed out - subscription may not be active")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
try:
client.disconnect()
except:
pass # Ignore disconnect errors in cleanupimport AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
from AWSIoTPythonSDK.exception.AWSIoTExceptions import (
publishQueueFullException,
publishQueueDisabledException
)
# Create client with limited queue
client = AWSIoTPyMQTT.AWSIoTMQTTClient("queueClient")
client.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
client.configureCredentials("rootCA.crt", "private.key", "certificate.crt")
# Configure small queue for demonstration
client.configureOfflinePublishQueueing(5, AWSIoTPyMQTT.DROP_NEWEST)
try:
client.connect()
# Simulate offline condition by disconnecting
client.disconnect()
# Attempt to publish while offline
for i in range(10): # More messages than queue capacity
try:
message = f"Offline message {i}"
if not client.publish("offline/topic", message, 1):
print(f"Message {i} queued for offline delivery")
except publishQueueFullException:
print(f"Queue full - message {i} rejected")
# Implement retry logic or alternative handling
except publishQueueDisabledException:
print("Queueing is disabled - reconfigure client for offline operation")
break
except Exception as e:
print(f"Queue operation failed: {e}")from AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider
from AWSIoTPythonSDK.exception.AWSIoTExceptions import (
DiscoveryTimeoutException,
DiscoveryUnauthorizedException,
DiscoveryDataNotFoundException,
DiscoveryThrottlingException,
DiscoveryFailure
)
import time
def robust_discovery(thing_name, max_retries=3):
"""Perform Greengrass discovery with comprehensive error handling"""
discoveryProvider = DiscoveryInfoProvider()
discoveryProvider.configureEndpoint("greengrass-ats.iot.region.amazonaws.com")
discoveryProvider.configureCredentials("rootCA.crt", "certificate.crt", "private.key")
discoveryProvider.configureTimeout(30)
for attempt in range(max_retries):
try:
print(f"Discovery attempt {attempt + 1}")
discoveryInfo = discoveryProvider.discover(thing_name)
print("Discovery successful")
return discoveryInfo
except DiscoveryTimeoutException:
print("Discovery timed out - check network connectivity")
if attempt < max_retries - 1:
print("Retrying with longer timeout...")
discoveryProvider.configureTimeout(60) # Increase timeout
time.sleep(5)
except DiscoveryUnauthorizedException:
print("Discovery authentication failed")
print("Check certificate validity and thing permissions")
return None # Don't retry auth failures
except DiscoveryDataNotFoundException:
print(f"No discovery data for thing: {thing_name}")
print("Ensure thing is associated with a Greengrass group")
return None # Don't retry data not found
except DiscoveryThrottlingException:
print("Discovery requests are being throttled")
if attempt < max_retries - 1:
backoff_time = 2 ** attempt # Exponential backoff
print(f"Backing off for {backoff_time} seconds...")
time.sleep(backoff_time)
except DiscoveryFailure as e:
print(f"Discovery service failure: {e}")
if attempt < max_retries - 1:
print("Retrying discovery request...")
time.sleep(2)
except Exception as e:
print(f"Unexpected discovery error: {e}")
if attempt < max_retries - 1:
time.sleep(2)
print(f"Discovery failed after {max_retries} attempts")
return None
# Usage
discovery_result = robust_discovery("myIoTDevice")
if discovery_result:
cores = discovery_result.getAllCores()
print(f"Found {len(cores)} Greengrass cores")
else:
print("Discovery failed - falling back to direct AWS IoT connection")import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
from AWSIoTPythonSDK.exception.AWSIoTExceptions import *
import time
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RobustIoTClient:
"""IoT client with comprehensive error handling and recovery"""
def __init__(self, client_id, endpoint, port=8883):
self.client_id = client_id
self.endpoint = endpoint
self.port = port
self.client = None
self.connected = False
self.max_retries = 3
self.retry_delay = 5
def configure_credentials(self, ca_path, key_path, cert_path):
"""Configure client credentials"""
self.ca_path = ca_path
self.key_path = key_path
self.cert_path = cert_path
def create_client(self):
"""Create and configure MQTT client"""
self.client = AWSIoTPyMQTT.AWSIoTMQTTClient(self.client_id)
self.client.configureEndpoint(self.endpoint, self.port)
self.client.configureCredentials(self.ca_path, self.key_path, self.cert_path)
# Configure timeouts and reconnection
self.client.configureConnectDisconnectTimeout(10)
self.client.configureMQTTOperationTimeout(5)
self.client.configureAutoReconnectBackoffTime(1, 32, 20)
self.client.configureOfflinePublishQueueing(-1) # Infinite queue
def connect_with_retry(self):
"""Connect with automatic retry and error handling"""
for attempt in range(self.max_retries):
try:
logger.info(f"Connection attempt {attempt + 1}")
if self.client.connect():
self.connected = True
logger.info("Connected successfully")
return True
else:
raise connectError("Connection returned False")
except connectTimeoutException:
logger.warning(f"Connection timeout on attempt {attempt + 1}")
except connectError as e:
logger.error(f"Connection error: {e}")
except Exception as e:
logger.error(f"Unexpected connection error: {e}")
if attempt < self.max_retries - 1:
logger.info(f"Retrying in {self.retry_delay} seconds...")
time.sleep(self.retry_delay)
logger.error("Failed to connect after all retries")
return False
def publish_with_retry(self, topic, payload, qos=0):
"""Publish with error handling and retry logic"""
if not self.connected:
logger.warning("Not connected - attempting reconnection")
if not self.connect_with_retry():
return False
for attempt in range(self.max_retries):
try:
if self.client.publish(topic, payload, qos):
logger.info(f"Published to {topic}")
return True
else:
raise publishError("Publish returned False")
except publishTimeoutException:
logger.warning(f"Publish timeout on attempt {attempt + 1}")
except publishQueueFullException:
logger.warning("Publish queue full - clearing some messages")
# Could implement queue management logic here
except publishError as e:
logger.error(f"Publish error: {e}")
except Exception as e:
logger.error(f"Unexpected publish error: {e}")
if attempt < self.max_retries - 1:
time.sleep(1)
logger.error(f"Failed to publish to {topic} after all retries")
return False
def subscribe_with_retry(self, topic, qos, callback):
"""Subscribe with error handling"""
if not self.connected:
if not self.connect_with_retry():
return False
try:
if self.client.subscribe(topic, qos, callback):
logger.info(f"Subscribed to {topic}")
return True
else:
raise subscribeError("Subscribe returned False")
except subscribeTimeoutException:
logger.error("Subscribe operation timed out")
except subackError:
logger.error("Subscribe was rejected by broker")
except subscribeError as e:
logger.error(f"Subscribe error: {e}")
except Exception as e:
logger.error(f"Unexpected subscribe error: {e}")
return False
def disconnect_safely(self):
"""Disconnect with error handling"""
if not self.connected:
return
try:
self.client.disconnect()
self.connected = False
logger.info("Disconnected successfully")
except disconnectTimeoutException:
logger.warning("Disconnect timed out")
except disconnectError as e:
logger.error(f"Disconnect error: {e}")
except Exception as e:
logger.error(f"Unexpected disconnect error: {e}")
# Usage example
client = RobustIoTClient("robustClient", "endpoint.iot.region.amazonaws.com")
client.configure_credentials("rootCA.crt", "private.key", "certificate.crt")
client.create_client()
if client.connect_with_retry():
# Publish with error handling
client.publish_with_retry("test/topic", "Hello World", 1)
# Subscribe with error handling
def message_callback(client, userdata, message):
print(f"Received: {message.payload.decode()}")
client.subscribe_with_retry("test/response", 1, message_callback)
# Keep running
try:
time.sleep(30)
finally:
client.disconnect_safely()# Exception hierarchy structure
Exception
├── acceptTimeoutException
├── operationError
│ ├── connectError
│ ├── disconnectError
│ ├── publishError
│ ├── publishQueueFullException
│ ├── publishQueueDisabledException
│ ├── subscribeError
│ ├── subackError
│ ├── subscribeQueueFullException
│ ├── subscribeQueueDisabledException
│ ├── unsubscribeError
│ ├── unsubscribeQueueFullException
│ └── unsubscribeQueueDisabledException
├── operationTimeoutException
│ ├── connectTimeoutException
│ ├── disconnectTimeoutException
│ ├── publishTimeoutException
│ ├── subscribeTimeoutException
│ └── unsubscribeTimeoutException
├── wssNoKeyInEnvironmentError
├── wssHandShakeError
├── DiscoveryTimeoutException
├── DiscoveryUnauthorizedException
├── DiscoveryDataNotFoundException
├── DiscoveryInvalidRequestException
├── DiscoveryThrottlingException
├── DiscoveryFailure
└── ClientError
# Exception handling best practices
error_handling_patterns = {
"timeout_exceptions": "Retry with backoff, check network connectivity",
"auth_exceptions": "Don't retry, check credentials and permissions",
"queue_exceptions": "Implement queue management or fallback strategies",
"discovery_exceptions": "Use exponential backoff, fallback to direct connection",
"general_errors": "Log error details, implement graceful degradation"
}Install with Tessl CLI
npx tessl i tessl/pypi-awsiotpythonsdk