AWS IoT SDK based on the AWS Common Runtime for connecting IoT devices to AWS IoT Core services
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive MQTT connection management supporting both MQTT3 and MQTT5 protocols with multiple authentication methods, extensive configuration options, and platform-specific certificate store integration.
Creates MQTT3 connection using certificate and private key files from the filesystem.
def mtls_from_path(cert_filepath, pri_key_filepath, **kwargs):
"""
Create MQTT connection using mTLS certificates from file paths.
Parameters:
- cert_filepath (str): Path to certificate file
- pri_key_filepath (str): Path to private key file
- endpoint (str): AWS IoT endpoint hostname
- client_id (str): Unique MQTT client identifier
- client_bootstrap: Client bootstrap for networking
- port (int): Connection port (default: 8883)
- clean_session (bool): Start with clean session
- keep_alive_secs (int): Keep alive interval in seconds
- will: Last will and testament message
- username (str): MQTT username
- password (str): MQTT password
- ca_filepath (str): CA certificate file path
- ca_dirpath (str): CA certificate directory path
- ca_bytes (bytes): CA certificate as bytes
- enable_metrics_collection (bool): Enable SDK metrics
- on_connection_interrupted: Callback for connection interruption
- on_connection_resumed: Callback for connection resumption
- on_connection_success: Callback for successful connection
- on_connection_failure: Callback for connection failure
- on_connection_closed: Callback for connection closure
Returns:
awscrt.mqtt.Connection: Configured MQTT connection
"""Usage example:
from awsiot import mqtt_connection_builder
connection = mqtt_connection_builder.mtls_from_path(
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
cert_filepath="/path/to/certificate.pem.crt",
pri_key_filepath="/path/to/private.pem.key",
client_id="my-device-001",
clean_session=False,
keep_alive_secs=30,
on_connection_interrupted=lambda connection, error, **kwargs: print(f"Connection interrupted: {error}"),
on_connection_resumed=lambda connection, return_code, session_present, **kwargs: print("Connection resumed")
)
# Connect and wait for completion
connect_future = connection.connect()
connect_future.result()Creates MQTT3 connection using certificate and private key data loaded in memory.
def mtls_from_bytes(cert_bytes, pri_key_bytes, **kwargs):
"""
Create MQTT connection using mTLS certificates from in-memory bytes.
Parameters:
- cert_bytes (bytes): Certificate data as bytes
- pri_key_bytes (bytes): Private key data as bytes
- **kwargs: Same parameters as mtls_from_path
Returns:
awscrt.mqtt.Connection: Configured MQTT connection
"""Uses PKCS#11 hardware security modules or software tokens for private key operations.
def mtls_with_pkcs11(**kwargs):
"""
Create MQTT connection using PKCS#11 for private key operations (Unix only).
Parameters:
- pkcs11_lib (str): Path to PKCS#11 library
- user_pin (str): User PIN for PKCS#11 token
- slot_id (int): PKCS#11 slot ID (optional)
- token_label (str): PKCS#11 token label (optional)
- private_key_label (str): Private key label in PKCS#11 token
- cert_filepath (str): Certificate file path (optional)
- cert_bytes (bytes): Certificate as bytes (optional)
- **kwargs: Same parameters as mtls_from_path
Returns:
awscrt.mqtt.Connection: Configured MQTT connection
"""Uses PKCS#12 certificate bundles, typically from macOS Keychain.
def mtls_with_pkcs12(pkcs12_filepath, pkcs12_password, **kwargs):
"""
Create MQTT connection using PKCS#12 certificate bundle (macOS only).
Parameters:
- pkcs12_filepath (str): Path to PKCS#12 file
- pkcs12_password (str): Password for PKCS#12 file
- **kwargs: Same parameters as mtls_from_path
Returns:
awscrt.mqtt.Connection: Configured MQTT connection
"""Uses certificates from Windows certificate store.
def mtls_with_windows_cert_store_path(cert_store_path, **kwargs):
"""
Create MQTT connection using Windows certificate store (Windows only).
Parameters:
- cert_store_path (str): Certificate store path
- **kwargs: Same parameters as mtls_from_path
Returns:
awscrt.mqtt.Connection: Configured MQTT connection
"""Creates WebSocket connection with automatic AWS request signing using credentials.
def websockets_with_default_aws_signing(region, credentials_provider, **kwargs):
"""
Create WebSocket MQTT connection with AWS request signing.
Parameters:
- region (str): AWS region
- credentials_provider: AWS credentials provider
- **kwargs: Same parameters as mtls_from_path (port defaults to 443)
Returns:
awscrt.mqtt.Connection: Configured MQTT connection
"""Usage example:
from awsiot import mqtt_connection_builder
from awscrt import auth
# Create credentials provider
credentials_provider = auth.AwsCredentialsProvider.new_default_chain()
connection = mqtt_connection_builder.websockets_with_default_aws_signing(
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
region="us-east-1",
credentials_provider=credentials_provider,
client_id="my-device-websocket"
)Creates WebSocket connection with custom handshake transformation function.
def websockets_with_custom_handshake(websocket_handshake_transform, **kwargs):
"""
Create WebSocket MQTT connection with custom handshake.
Parameters:
- websocket_handshake_transform: Function to transform WebSocket handshake request
- **kwargs: Same parameters as mtls_from_path (port defaults to 443)
Returns:
awscrt.mqtt.Connection: Configured MQTT connection
"""Uses AWS IoT custom authorizers for device authentication without certificates.
def direct_with_custom_authorizer(**kwargs):
"""
Create direct MQTT connection using custom authorizer.
Parameters:
- auth_username (str): Username for custom authorizer
- auth_authorizer_name (str): Name of the custom authorizer
- auth_authorizer_signature (str): Signature for authorizer
- auth_password (str): Password for custom authorizer
- auth_token_key_name (str): Token key name for authorizer
- auth_token_value (str): Token value for authorizer
- **kwargs: Same parameters as mtls_from_path
Returns:
awscrt.mqtt.Connection: Configured MQTT connection
"""def websockets_with_custom_authorizer(**kwargs):
"""
Create WebSocket MQTT connection using custom authorizer.
Parameters:
- Same as direct_with_custom_authorizer
- **kwargs: Same parameters as mtls_from_path (port defaults to 443)
Returns:
awscrt.mqtt.Connection: Configured MQTT connection
"""Creates connection with default TLS context and options.
def new_default_builder(**kwargs):
"""
Create MQTT connection with default TLS options.
Parameters:
- **kwargs: Same parameters as mtls_from_path
Returns:
awscrt.mqtt.Connection: Configured MQTT connection
"""MQTT5 client builders provide the same authentication methods as MQTT3 but return MQTT5 clients with enhanced features.
def mtls_from_path(cert_filepath, pri_key_filepath, **kwargs):
"""
Create MQTT5 client using mTLS certificates from file paths.
Parameters:
- cert_filepath (str): Path to certificate file
- pri_key_filepath (str): Path to private key file
- endpoint (str): AWS IoT endpoint hostname
- client_id (str): Unique MQTT client identifier
- client_options: MQTT5 client options
- connect_options: MQTT5 connect options
- session_expiry_interval_sec (int): Session expiry interval
- request_response_information (bool): Request response information
- request_problem_information (bool): Request problem information
- receive_maximum (int): Maximum receive messages
- maximum_packet_size (int): Maximum packet size
- will_delay_interval_sec (int): Will delay interval
- session_behavior: Session behavior configuration
- extended_validation_and_flow_control_options: Flow control options
- offline_queue_behavior: Offline queue behavior
- topic_aliasing_options: Topic aliasing configuration
- retry_jitter_mode: Retry jitter mode
- min_reconnect_delay_ms (int): Minimum reconnect delay
- max_reconnect_delay_ms (int): Maximum reconnect delay
- connack_timeout_ms (int): CONNACK timeout
- ack_timeout_sec (int): Acknowledgment timeout
- on_publish_received: Callback for received publishes
- on_lifecycle_stopped: Lifecycle stopped callback
- on_lifecycle_attempting_connect: Attempting connect callback
- on_lifecycle_connection_success: Connection success callback
- on_lifecycle_connection_failure: Connection failure callback
- on_lifecycle_disconnection: Disconnection callback
Returns:
awscrt.mqtt5.Client: Configured MQTT5 client
"""All other MQTT5 authentication methods follow the same pattern as MQTT3 but return awscrt.mqtt5.Client objects:
mtls_from_bytes(cert_bytes, pri_key_bytes, **kwargs)mtls_with_pkcs11(**kwargs) (Unix only)mtls_with_pkcs12(pkcs12_filepath, pkcs12_password, **kwargs) (macOS only)mtls_with_windows_cert_store_path(cert_store_path, **kwargs) (Windows only)websockets_with_default_aws_signing(region, credentials_provider, **kwargs)websockets_with_custom_handshake(websocket_handshake_transform, **kwargs)direct_with_custom_authorizer(**kwargs)websockets_with_custom_authorizer(**kwargs)new_default_builder(**kwargs)DEFAULT_WEBSOCKET_MQTT_PORT = 443
DEFAULT_DIRECT_MQTT_PORT = 8883
DEFAULT_KEEP_ALIVE = 1200from awsiot import mqtt_connection_builder
from awscrt import mqtt
import sys
def on_connection_interrupted(connection, error, **kwargs):
print(f"Connection interrupted. Error: {error}")
def on_connection_resumed(connection, return_code, session_present, **kwargs):
print(f"Connection resumed. Return code: {return_code} Session present: {session_present}")
def on_connection_success(connection, callback_data, **kwargs):
print("Connection successful")
def on_connection_failure(connection, callback_data, **kwargs):
print("Connection failed")
def on_connection_closed(connection, callback_data, **kwargs):
print("Connection closed")
try:
connection = mqtt_connection_builder.mtls_from_path(
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
cert_filepath="/path/to/certificate.pem.crt",
pri_key_filepath="/path/to/private.pem.key",
client_id="my-device-123",
clean_session=False,
keep_alive_secs=30,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
on_connection_success=on_connection_success,
on_connection_failure=on_connection_failure,
on_connection_closed=on_connection_closed
)
# Connect
connect_future = connection.connect()
connect_future.result()
print("Connected successfully!")
# Your application logic here
# Disconnect
disconnect_future = connection.disconnect()
disconnect_future.result()
print("Disconnected successfully!")
except Exception as e:
print(f"Connection error: {e}")
sys.exit(1)from awsiot import mqtt_connection_builder
from awscrt import auth
import boto3
# Get credentials from boto3 session
session = boto3.Session()
credentials = session.get_credentials()
# Create credentials provider
credentials_provider = auth.AwsCredentialsProvider.new_static(
access_key_id=credentials.access_key,
secret_access_key=credentials.secret_key,
session_token=credentials.token
)
connection = mqtt_connection_builder.websockets_with_default_aws_signing(
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
region=session.region_name,
credentials_provider=credentials_provider,
client_id="websocket-client-123",
clean_session=True
)
connect_future = connection.connect()
connect_future.result()Install with Tessl CLI
npx tessl i tessl/pypi-awsiotsdk