SDK for connecting to AWS IoT using Python.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core MQTT connectivity for AWS IoT providing secure communication through TLS mutual authentication or WebSocket SigV4. Includes connection management, publish/subscribe operations, and advanced features like auto-reconnect, progressive backoff, and offline request queueing.
Create an MQTT client with flexible configuration options for protocol version, connection type, and session management.
class AWSIoTMQTTClient:
def __init__(self, clientID: str, protocolType: int = MQTTv3_1_1, useWebsocket: bool = False, cleanSession: bool = True):
"""
Create AWS IoT MQTT client.
Args:
clientID (str): Client identifier for MQTT connection
protocolType (int): MQTT version (MQTTv3_1=3, MQTTv3_1_1=4)
useWebsocket (bool): Enable MQTT over WebSocket SigV4
cleanSession (bool): Start with clean session state
"""Configure endpoint, authentication, and connection parameters before establishing connection.
def configureEndpoint(self, hostName: str, portNumber: int):
"""
Configure AWS IoT endpoint.
Args:
hostName (str): AWS IoT endpoint hostname
portNumber (int): Port (8883 for TLS, 443 for WebSocket/ALPN)
"""
def configureCredentials(self, CAFilePath: str, KeyPath: str = "", CertificatePath: str = "", Ciphers: str = None):
"""
Configure X.509 certificate credentials for TLS mutual authentication.
Args:
CAFilePath (str): Path to root CA certificate file
KeyPath (str): Path to private key file (required for TLS)
CertificatePath (str): Path to device certificate file (required for TLS)
Ciphers (str): SSL cipher suite string (optional)
"""
def configureIAMCredentials(self, AWSAccessKeyID: str, AWSSecretAccessKey: str, AWSSessionToken: str = ""):
"""
Configure IAM credentials for WebSocket SigV4 authentication.
Args:
AWSAccessKeyID (str): AWS access key ID
AWSSecretAccessKey (str): AWS secret access key
AWSSessionToken (str): AWS session token for temporary credentials
"""
def configureUsernamePassword(self, username: str, password: str = None):
"""
Configure MQTT username and password.
Args:
username (str): MQTT username
password (str): MQTT password (optional)
"""
def configureLastWill(self, topic: str, payload: str, QoS: int, retain: bool = False):
"""
Configure Last Will and Testament message for unexpected disconnections.
Args:
topic (str): Topic to publish last will message to
payload (str): Last will message payload
QoS (int): Quality of Service level (0 or 1)
retain (bool): Whether to retain the last will message
"""
def clearLastWill(self):
"""
Clear the previously configured Last Will and Testament.
"""Configure auto-reconnect behavior, offline queueing, timeouts, and other advanced features.
def configureAutoReconnectBackoffTime(self, baseReconnectQuietTimeSecond: int, maxReconnectQuietTimeSecond: int, stableConnectionTimeSecond: int):
"""
Configure progressive reconnect backoff timing.
Args:
baseReconnectQuietTimeSecond (int): Initial backoff time in seconds
maxReconnectQuietTimeSecond (int): Maximum backoff time in seconds
stableConnectionTimeSecond (int): Stable connection threshold in seconds
"""
def configureOfflinePublishQueueing(self, queueSize: int, dropBehavior: int = DROP_NEWEST):
"""
Configure offline request queueing for publish operations.
Args:
queueSize (int): Queue size (0=disabled, -1=infinite)
dropBehavior (int): DROP_OLDEST=0 or DROP_NEWEST=1
"""
def configureDrainingFrequency(self, frequencyInHz: float):
"""
Configure queue draining speed when connection restored.
Args:
frequencyInHz (float): Draining frequency in requests per second
"""
def configureConnectDisconnectTimeout(self, timeoutSecond: int):
"""
Configure connection/disconnection timeout.
Args:
timeoutSecond (int): Timeout in seconds for CONNACK/disconnect
"""
def configureMQTTOperationTimeout(self, timeoutSecond: int):
"""
Configure MQTT operation timeout for QoS 1 operations.
Args:
timeoutSecond (int): Timeout in seconds for PUBACK/SUBACK/UNSUBACK
"""Establish and manage MQTT connections with both synchronous and asynchronous options.
def connect(self, keepAliveIntervalSecond: int = 600) -> bool:
"""
Connect to AWS IoT synchronously.
Args:
keepAliveIntervalSecond (int): MQTT keep-alive interval in seconds
Returns:
bool: True if connection successful, False otherwise
"""
def connectAsync(self, keepAliveIntervalSecond: int = 600, ackCallback: callable = None) -> int:
"""
Connect to AWS IoT asynchronously.
Args:
keepAliveIntervalSecond (int): MQTT keep-alive interval in seconds
ackCallback (callable): Callback for CONNACK (mid, data) -> None
Returns:
int: Packet ID for tracking in callback
"""
def disconnect() -> bool:
"""
Disconnect from AWS IoT synchronously.
Returns:
bool: True if disconnect successful, False otherwise
"""
def disconnectAsync(self, ackCallback: callable = None) -> int:
"""
Disconnect from AWS IoT asynchronously.
Args:
ackCallback (callable): Callback for disconnect completion (mid, data) -> None
Returns:
int: Packet ID for tracking in callback
"""Publish messages to AWS IoT topics with QoS support and asynchronous callbacks.
def publish(self, topic: str, payload: str, QoS: int) -> bool:
"""
Publish message synchronously.
Args:
topic (str): MQTT topic name
payload (str): Message payload
QoS (int): Quality of Service (0 or 1)
Returns:
bool: True if publish request sent, False otherwise
"""
def publishAsync(self, topic: str, payload: str, QoS: int, ackCallback: callable = None) -> int:
"""
Publish message asynchronously.
Args:
topic (str): MQTT topic name
payload (str): Message payload
QoS (int): Quality of Service (0 or 1)
ackCallback (callable): Callback for PUBACK (mid) -> None (QoS 1 only)
Returns:
int: Packet ID for tracking in callback
"""Subscribe to MQTT topics with message callbacks and QoS support.
def subscribe(self, topic: str, QoS: int, callback: callable) -> bool:
"""
Subscribe to topic synchronously.
Args:
topic (str): MQTT topic name or filter
QoS (int): Quality of Service (0 or 1)
callback (callable): Message callback (client, userdata, message) -> None
Returns:
bool: True if subscribe successful, False otherwise
"""
def subscribeAsync(self, topic: str, QoS: int, ackCallback: callable = None, messageCallback: callable = None) -> int:
"""
Subscribe to topic asynchronously.
Args:
topic (str): MQTT topic name or filter
QoS (int): Quality of Service (0 or 1)
ackCallback (callable): Callback for SUBACK (mid, data) -> None
messageCallback (callable): Message callback (client, userdata, message) -> None
Returns:
int: Packet ID for tracking in callback
"""
def unsubscribe(self, topic: str) -> bool:
"""
Unsubscribe from topic synchronously.
Args:
topic (str): MQTT topic name or filter
Returns:
bool: True if unsubscribe successful, False otherwise
"""
def unsubscribeAsync(self, topic: str, ackCallback: callable = None) -> int:
"""
Unsubscribe from topic asynchronously.
Args:
topic (str): MQTT topic name or filter
ackCallback (callable): Callback for UNSUBACK (mid) -> None
Returns:
int: Packet ID for tracking in callback
"""Configure global callbacks for connection state and message events.
def onOnline(self):
"""
Override this method to handle online events.
Called when client connects to AWS IoT.
"""
def onOffline(self):
"""
Override this method to handle offline events.
Called when client disconnects from AWS IoT.
"""
def onMessage(self, message):
"""
Override this method to handle all incoming messages.
Args:
message: MQTT message with .topic and .payload attributes
"""Additional configuration options for metrics collection and custom socket factories.
def enableMetricsCollection(self):
"""Enable SDK metrics collection (enabled by default)."""
def disableMetricsCollection(self):
"""Disable SDK metrics collection."""
def configureSocketFactory(self, socket_factory: callable):
"""
Configure custom socket factory for proxy support.
Args:
socket_factory (callable): Function that returns configured socket
"""import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
# Create and configure client
client = AWSIoTPyMQTT.AWSIoTMQTTClient("myDevice")
client.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
client.configureCredentials("rootCA.crt", "private.key", "certificate.crt")
# Configure auto-reconnect
client.configureAutoReconnectBackoffTime(1, 32, 20)
client.configureOfflinePublishQueueing(-1) # Infinite queue
client.configureDrainingFrequency(2) # 2 Hz
# Define message callback
def customCallback(client, userdata, message):
print(f"Received message on {message.topic}: {message.payload.decode()}")
# Connect and subscribe
client.connect()
client.subscribe("device/data", 1, customCallback)
# Publish message
client.publish("device/status", "online", 0)import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
# Create WebSocket client
client = AWSIoTPyMQTT.AWSIoTMQTTClient("webClient", useWebsocket=True)
client.configureEndpoint("endpoint.iot.region.amazonaws.com", 443)
client.configureIAMCredentials("AKIA...", "secret", "session-token")
# Connect and publish
client.connect()
client.publish("web/data", '{"temperature": 25.0}', 1)
client.disconnect()import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
client = AWSIoTPyMQTT.AWSIoTMQTTClient("asyncClient")
client.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
client.configureCredentials("rootCA.crt", "private.key", "certificate.crt")
# Async connection callback
def connackCallback(mid, data):
print(f"Connected with mid: {mid}, result: {data}")
# Async publish callback
def pubackCallback(mid):
print(f"Published message mid: {mid}")
# Connect asynchronously
mid = client.connectAsync(ackCallback=connackCallback)
print(f"Connection request sent with mid: {mid}")
# Publish asynchronously
pub_mid = client.publishAsync("async/topic", "hello", 1, pubackCallback)
print(f"Publish request sent with mid: {pub_mid}")# Protocol version constants
MQTTv3_1 = 3
MQTTv3_1_1 = 4
# Queue drop behavior constants
DROP_OLDEST = 0
DROP_NEWEST = 1
# Message object (received in callbacks)
class Message:
topic: str # Message topic
payload: bytes # Message payload as bytesInstall with Tessl CLI
npx tessl i tessl/pypi-awsiotpythonsdk