CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-awsiotpythonsdk

SDK for connecting to AWS IoT using Python.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

mqtt-client.mddocs/

Core MQTT Operations

Core MQTT connectivity for AWS IoT providing secure communication through TLS mutual authentication or WebSocket SigV4. Includes connection management, publish/subscribe operations, and advanced features like auto-reconnect, progressive backoff, and offline request queueing.

Capabilities

MQTT Client Creation

Create an MQTT client with flexible configuration options for protocol version, connection type, and session management.

class AWSIoTMQTTClient:
    def __init__(self, clientID: str, protocolType: int = MQTTv3_1_1, useWebsocket: bool = False, cleanSession: bool = True):
        """
        Create AWS IoT MQTT client.
        
        Args:
            clientID (str): Client identifier for MQTT connection
            protocolType (int): MQTT version (MQTTv3_1=3, MQTTv3_1_1=4)
            useWebsocket (bool): Enable MQTT over WebSocket SigV4
            cleanSession (bool): Start with clean session state
        """

Connection Configuration

Configure endpoint, authentication, and connection parameters before establishing connection.

def configureEndpoint(self, hostName: str, portNumber: int):
    """
    Configure AWS IoT endpoint.
    
    Args:
        hostName (str): AWS IoT endpoint hostname
        portNumber (int): Port (8883 for TLS, 443 for WebSocket/ALPN)
    """

def configureCredentials(self, CAFilePath: str, KeyPath: str = "", CertificatePath: str = "", Ciphers: str = None):
    """
    Configure X.509 certificate credentials for TLS mutual authentication.
    
    Args:
        CAFilePath (str): Path to root CA certificate file
        KeyPath (str): Path to private key file (required for TLS)
        CertificatePath (str): Path to device certificate file (required for TLS) 
        Ciphers (str): SSL cipher suite string (optional)
    """

def configureIAMCredentials(self, AWSAccessKeyID: str, AWSSecretAccessKey: str, AWSSessionToken: str = ""):
    """
    Configure IAM credentials for WebSocket SigV4 authentication.
    
    Args:
        AWSAccessKeyID (str): AWS access key ID
        AWSSecretAccessKey (str): AWS secret access key
        AWSSessionToken (str): AWS session token for temporary credentials
    """

def configureUsernamePassword(self, username: str, password: str = None):
    """
    Configure MQTT username and password.
    
    Args:
        username (str): MQTT username
        password (str): MQTT password (optional)
    """

def configureLastWill(self, topic: str, payload: str, QoS: int, retain: bool = False):
    """
    Configure Last Will and Testament message for unexpected disconnections.
    
    Args:
        topic (str): Topic to publish last will message to
        payload (str): Last will message payload
        QoS (int): Quality of Service level (0 or 1)
        retain (bool): Whether to retain the last will message
    """

def clearLastWill(self):
    """
    Clear the previously configured Last Will and Testament.
    """

Advanced Configuration

Configure auto-reconnect behavior, offline queueing, timeouts, and other advanced features.

def configureAutoReconnectBackoffTime(self, baseReconnectQuietTimeSecond: int, maxReconnectQuietTimeSecond: int, stableConnectionTimeSecond: int):
    """
    Configure progressive reconnect backoff timing.
    
    Args:
        baseReconnectQuietTimeSecond (int): Initial backoff time in seconds
        maxReconnectQuietTimeSecond (int): Maximum backoff time in seconds  
        stableConnectionTimeSecond (int): Stable connection threshold in seconds
    """

def configureOfflinePublishQueueing(self, queueSize: int, dropBehavior: int = DROP_NEWEST):
    """
    Configure offline request queueing for publish operations.
    
    Args:
        queueSize (int): Queue size (0=disabled, -1=infinite)
        dropBehavior (int): DROP_OLDEST=0 or DROP_NEWEST=1
    """

def configureDrainingFrequency(self, frequencyInHz: float):
    """
    Configure queue draining speed when connection restored.
    
    Args:
        frequencyInHz (float): Draining frequency in requests per second
    """

def configureConnectDisconnectTimeout(self, timeoutSecond: int):
    """
    Configure connection/disconnection timeout.
    
    Args:
        timeoutSecond (int): Timeout in seconds for CONNACK/disconnect
    """

def configureMQTTOperationTimeout(self, timeoutSecond: int):
    """
    Configure MQTT operation timeout for QoS 1 operations.
    
    Args:
        timeoutSecond (int): Timeout in seconds for PUBACK/SUBACK/UNSUBACK
    """

Connection Management

Establish and manage MQTT connections with both synchronous and asynchronous options.

def connect(self, keepAliveIntervalSecond: int = 600) -> bool:
    """
    Connect to AWS IoT synchronously.
    
    Args:
        keepAliveIntervalSecond (int): MQTT keep-alive interval in seconds
        
    Returns:
        bool: True if connection successful, False otherwise
    """

def connectAsync(self, keepAliveIntervalSecond: int = 600, ackCallback: callable = None) -> int:
    """
    Connect to AWS IoT asynchronously.
    
    Args:
        keepAliveIntervalSecond (int): MQTT keep-alive interval in seconds
        ackCallback (callable): Callback for CONNACK (mid, data) -> None
        
    Returns:
        int: Packet ID for tracking in callback
    """

def disconnect() -> bool:
    """
    Disconnect from AWS IoT synchronously.
    
    Returns:
        bool: True if disconnect successful, False otherwise
    """

def disconnectAsync(self, ackCallback: callable = None) -> int:
    """
    Disconnect from AWS IoT asynchronously.
    
    Args:
        ackCallback (callable): Callback for disconnect completion (mid, data) -> None
        
    Returns:
        int: Packet ID for tracking in callback
    """

Publish Operations

Publish messages to AWS IoT topics with QoS support and asynchronous callbacks.

def publish(self, topic: str, payload: str, QoS: int) -> bool:
    """
    Publish message synchronously.
    
    Args:
        topic (str): MQTT topic name
        payload (str): Message payload
        QoS (int): Quality of Service (0 or 1)
        
    Returns:
        bool: True if publish request sent, False otherwise
    """

def publishAsync(self, topic: str, payload: str, QoS: int, ackCallback: callable = None) -> int:
    """
    Publish message asynchronously.
    
    Args:
        topic (str): MQTT topic name
        payload (str): Message payload
        QoS (int): Quality of Service (0 or 1)
        ackCallback (callable): Callback for PUBACK (mid) -> None (QoS 1 only)
        
    Returns:
        int: Packet ID for tracking in callback  
    """

Subscribe Operations

Subscribe to MQTT topics with message callbacks and QoS support.

def subscribe(self, topic: str, QoS: int, callback: callable) -> bool:
    """
    Subscribe to topic synchronously.
    
    Args:
        topic (str): MQTT topic name or filter
        QoS (int): Quality of Service (0 or 1)
        callback (callable): Message callback (client, userdata, message) -> None
        
    Returns:
        bool: True if subscribe successful, False otherwise
    """

def subscribeAsync(self, topic: str, QoS: int, ackCallback: callable = None, messageCallback: callable = None) -> int:
    """
    Subscribe to topic asynchronously.
    
    Args:
        topic (str): MQTT topic name or filter
        QoS (int): Quality of Service (0 or 1)
        ackCallback (callable): Callback for SUBACK (mid, data) -> None
        messageCallback (callable): Message callback (client, userdata, message) -> None
        
    Returns:
        int: Packet ID for tracking in callback
    """

def unsubscribe(self, topic: str) -> bool:
    """
    Unsubscribe from topic synchronously.
    
    Args:
        topic (str): MQTT topic name or filter
        
    Returns:
        bool: True if unsubscribe successful, False otherwise
    """

def unsubscribeAsync(self, topic: str, ackCallback: callable = None) -> int:
    """
    Unsubscribe from topic asynchronously.
    
    Args:
        topic (str): MQTT topic name or filter
        ackCallback (callable): Callback for UNSUBACK (mid) -> None
        
    Returns:
        int: Packet ID for tracking in callback
    """

Event Callbacks

Configure global callbacks for connection state and message events.

def onOnline(self):
    """
    Override this method to handle online events.
    Called when client connects to AWS IoT.
    """

def onOffline(self):
    """
    Override this method to handle offline events.
    Called when client disconnects from AWS IoT.
    """

def onMessage(self, message):
    """
    Override this method to handle all incoming messages.
    
    Args:
        message: MQTT message with .topic and .payload attributes
    """

Metrics and Socket Configuration

Additional configuration options for metrics collection and custom socket factories.

def enableMetricsCollection(self):
    """Enable SDK metrics collection (enabled by default)."""

def disableMetricsCollection(self):
    """Disable SDK metrics collection."""

def configureSocketFactory(self, socket_factory: callable):
    """
    Configure custom socket factory for proxy support.
    
    Args:
        socket_factory (callable): Function that returns configured socket
    """

Usage Examples

Basic MQTT Publish/Subscribe

import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT

# Create and configure client
client = AWSIoTPyMQTT.AWSIoTMQTTClient("myDevice")
client.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
client.configureCredentials("rootCA.crt", "private.key", "certificate.crt")

# Configure auto-reconnect
client.configureAutoReconnectBackoffTime(1, 32, 20)
client.configureOfflinePublishQueueing(-1)  # Infinite queue
client.configureDrainingFrequency(2)  # 2 Hz

# Define message callback
def customCallback(client, userdata, message):
    print(f"Received message on {message.topic}: {message.payload.decode()}")

# Connect and subscribe
client.connect()
client.subscribe("device/data", 1, customCallback)

# Publish message
client.publish("device/status", "online", 0)

WebSocket Connection with IAM

import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT

# Create WebSocket client
client = AWSIoTPyMQTT.AWSIoTMQTTClient("webClient", useWebsocket=True)
client.configureEndpoint("endpoint.iot.region.amazonaws.com", 443)
client.configureIAMCredentials("AKIA...", "secret", "session-token")

# Connect and publish
client.connect()
client.publish("web/data", '{"temperature": 25.0}', 1)
client.disconnect()

Asynchronous Operations

import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT

client = AWSIoTPyMQTT.AWSIoTMQTTClient("asyncClient")
client.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
client.configureCredentials("rootCA.crt", "private.key", "certificate.crt")

# Async connection callback
def connackCallback(mid, data):
    print(f"Connected with mid: {mid}, result: {data}")

# Async publish callback  
def pubackCallback(mid):
    print(f"Published message mid: {mid}")

# Connect asynchronously
mid = client.connectAsync(ackCallback=connackCallback)
print(f"Connection request sent with mid: {mid}")

# Publish asynchronously
pub_mid = client.publishAsync("async/topic", "hello", 1, pubackCallback)
print(f"Publish request sent with mid: {pub_mid}")

Types

# Protocol version constants
MQTTv3_1 = 3
MQTTv3_1_1 = 4

# Queue drop behavior constants  
DROP_OLDEST = 0
DROP_NEWEST = 1

# Message object (received in callbacks)
class Message:
    topic: str      # Message topic
    payload: bytes  # Message payload as bytes

Install with Tessl CLI

npx tessl i tessl/pypi-awsiotpythonsdk

docs

device-shadows.md

exception-handling.md

greengrass-discovery.md

index.md

iot-jobs.md

mqtt-client.md

tile.json