AWS IoT SDK based on the AWS Common Runtime for connecting IoT devices to AWS IoT Core services
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Device certificate provisioning and thing registration capabilities for automated fleet provisioning workflows including certificate creation from CSR, keys and certificate generation, and device thing registration through both V1 (callback-based) and V2 (Future-based) client interfaces.
The V1 client provides callback-based operations following the traditional publish/subscribe pattern.
class IotIdentityClient(MqttServiceClient):
"""
V1 client for AWS IoT Identity/Fleet Provisioning service with callback-based operations.
Parameters:
- mqtt_connection: MQTT connection (mqtt.Connection or mqtt5.Client)
"""
def __init__(self, mqtt_connection): ...Usage example:
from awsiot import mqtt_connection_builder, iotidentity
# Create MQTT connection with provisioning claim certificates
connection = mqtt_connection_builder.mtls_from_path(
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
cert_filepath="/path/to/provisioning-claim.pem.crt",
pri_key_filepath="/path/to/provisioning-claim.pem.key",
client_id="provisioning-client-123"
)
# Create identity client
identity_client = iotidentity.IotIdentityClient(connection)def publish_create_certificate_from_csr(self, request, qos):
"""
Publish request to create certificate from Certificate Signing Request.
Parameters:
- request (CreateCertificateFromCsrRequest): CSR certificate request
- qos (awscrt.mqtt.QoS): Quality of service
Returns:
Future: Future that completes when request is published
"""
def subscribe_to_create_certificate_from_csr_accepted(self, request, qos, callback):
"""
Subscribe to successful create certificate from CSR responses.
Parameters:
- request (CreateCertificateFromCsrSubscriptionRequest): Subscription request
- qos (awscrt.mqtt.QoS): Quality of service
- callback: Function called with certificate response
Returns:
Tuple[Future, str]: Future for subscription, topic string
"""
def subscribe_to_create_certificate_from_csr_rejected(self, request, qos, callback):
"""
Subscribe to rejected create certificate from CSR responses.
Parameters:
- request (CreateCertificateFromCsrSubscriptionRequest): Subscription request
- qos (awscrt.mqtt.QoS): Quality of service
- callback: Function called when request is rejected
Returns:
Tuple[Future, str]: Future for subscription, topic string
"""def publish_create_keys_and_certificate(self, request, qos):
"""
Publish request to create new private key and certificate.
Parameters:
- request (CreateKeysAndCertificateRequest): Keys and certificate request
- qos (awscrt.mqtt.QoS): Quality of service
Returns:
Future: Future that completes when request is published
"""
def subscribe_to_create_keys_and_certificate_accepted(self, request, qos, callback):
"""
Subscribe to successful create keys and certificate responses.
Parameters:
- request (CreateKeysAndCertificateSubscriptionRequest): Subscription request
- qos (awscrt.mqtt.QoS): Quality of service
- callback: Function called with keys and certificate response
Returns:
Tuple[Future, str]: Future for subscription, topic string
"""
def subscribe_to_create_keys_and_certificate_rejected(self, request, qos, callback):
"""
Subscribe to rejected create keys and certificate responses.
"""def publish_register_thing(self, request, qos):
"""
Publish request to register device thing with provisioning template.
Parameters:
- request (RegisterThingRequest): Thing registration request
- qos (awscrt.mqtt.QoS): Quality of service
Returns:
Future: Future that completes when request is published
"""
def subscribe_to_register_thing_accepted(self, request, qos, callback):
"""
Subscribe to successful register thing responses.
Parameters:
- request (RegisterThingSubscriptionRequest): Subscription request
- qos (awscrt.mqtt.QoS): Quality of service
- callback: Function called with thing registration response
Returns:
Tuple[Future, str]: Future for subscription, topic string
"""
def subscribe_to_register_thing_rejected(self, request, qos, callback):
"""
Subscribe to rejected register thing responses.
"""The V2 client provides Future-based operations with request-response semantics.
class IotIdentityClientV2:
"""
V2 client for AWS IoT Identity/Fleet Provisioning service with Future-based operations.
"""
def __init__(self, connection): ...def create_certificate_from_csr(self, request):
"""
Create certificate from CSR using request-response pattern.
Parameters:
- request (CreateCertificateFromCsrRequest): CSR certificate request
Returns:
Future[CreateCertificateFromCsrResponse]: Future containing certificate response
"""
def create_keys_and_certificate(self, request):
"""
Create private key and certificate using request-response pattern.
Parameters:
- request (CreateKeysAndCertificateRequest): Keys and certificate request
Returns:
Future[CreateKeysAndCertificateResponse]: Future containing keys and certificate
"""
def register_thing(self, request):
"""
Register device thing using request-response pattern.
Parameters:
- request (RegisterThingRequest): Thing registration request
Returns:
Future[RegisterThingResponse]: Future containing registration response
"""@dataclass
class CreateCertificateFromCsrRequest:
"""Request to create certificate from CSR."""
certificate_signing_request: str
@dataclass
class CreateKeysAndCertificateRequest:
"""Request to create private key and certificate."""
pass # No additional parameters required
@dataclass
class RegisterThingRequest:
"""Request to register device thing."""
template_name: str
certificate_ownership_token: str
parameters: Optional[Dict[str, str]] = None@dataclass
class CreateCertificateFromCsrResponse:
"""Response from create certificate from CSR operation."""
certificate_pem: Optional[str] = None
certificate_arn: Optional[str] = None
certificate_id: Optional[str] = None
certificate_ownership_token: Optional[str] = None
@dataclass
class CreateKeysAndCertificateResponse:
"""Response from create keys and certificate operation."""
certificate_pem: Optional[str] = None
private_key: Optional[str] = None
certificate_arn: Optional[str] = None
certificate_id: Optional[str] = None
certificate_ownership_token: Optional[str] = None
@dataclass
class RegisterThingResponse:
"""Response from register thing operation."""
device_configuration: Optional[Dict[str, str]] = None
thing_name: Optional[str] = None@dataclass
class CreateCertificateFromCsrSubscriptionRequest:
"""Subscription request for create certificate from CSR responses."""
pass
@dataclass
class CreateKeysAndCertificateSubscriptionRequest:
"""Subscription request for create keys and certificate responses."""
pass
@dataclass
class RegisterThingSubscriptionRequest:
"""Subscription request for register thing responses."""
template_name: str@dataclass
class ErrorResponse:
"""Error response from identity operations."""
error_code: Optional[str] = None
error_message: Optional[str] = None
status_code: Optional[int] = Nonefrom awsiot import mqtt_connection_builder, iotidentity
from awscrt import mqtt
import json
import os
# Step 1: Connect with provisioning claim certificate
connection = mqtt_connection_builder.mtls_from_path(
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
cert_filepath="/path/to/provisioning-claim.pem.crt",
pri_key_filepath="/path/to/provisioning-claim.pem.key",
client_id="fleet-provisioning-123"
)
identity_client = iotidentity.IotIdentityClient(connection)
connection.connect().result()
# Global variables to store provisioning results
device_certificate = None
device_private_key = None
thing_name = None
certificate_ownership_token = None
def on_create_keys_accepted(response):
"""Handle successful keys and certificate creation."""
global device_certificate, device_private_key, certificate_ownership_token
print("Keys and certificate created successfully!")
device_certificate = response.certificate_pem
device_private_key = response.private_key
certificate_ownership_token = response.certificate_ownership_token
# Save certificate and key to files
with open("/tmp/device-certificate.pem.crt", "w") as f:
f.write(device_certificate)
with open("/tmp/device-private.pem.key", "w") as f:
f.write(device_private_key)
print(f"Certificate ID: {response.certificate_id}")
print(f"Certificate ARN: {response.certificate_arn}")
# Step 2: Register the thing using the certificate ownership token
register_request = iotidentity.RegisterThingRequest(
template_name="MyFleetProvisioningTemplate",
certificate_ownership_token=certificate_ownership_token,
parameters={
"DeviceLocation": "Building-A-Floor-2",
"DeviceType": "TemperatureSensor"
}
)
identity_client.publish_register_thing(register_request, mqtt.QoS.AT_LEAST_ONCE)
def on_create_keys_rejected(error):
"""Handle keys and certificate creation rejection."""
print(f"Keys and certificate creation rejected: {error}")
def on_register_thing_accepted(response):
"""Handle successful thing registration."""
global thing_name
print("Thing registered successfully!")
thing_name = response.thing_name
print(f"Thing name: {thing_name}")
print(f"Device configuration: {response.device_configuration}")
# Save device configuration
if response.device_configuration:
with open("/tmp/device-config.json", "w") as f:
json.dump(response.device_configuration, f, indent=2)
print("Fleet provisioning completed successfully!")
# Now you can disconnect from the provisioning endpoint
# and connect using the new device certificate
setup_device_connection()
def on_register_thing_rejected(error):
"""Handle thing registration rejection."""
print(f"Thing registration rejected: {error}")
def setup_device_connection():
"""Setup permanent device connection with new certificate."""
print("Setting up device connection with new certificate...")
# Disconnect from provisioning connection
connection.disconnect().result()
# Create new connection with device certificate
device_connection = mqtt_connection_builder.mtls_from_path(
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
cert_filepath="/tmp/device-certificate.pem.crt",
pri_key_filepath="/tmp/device-private.pem.key",
client_id=thing_name,
clean_session=False,
keep_alive_secs=30
)
device_connection.connect().result()
print(f"Device {thing_name} connected successfully!")
# Your device application logic starts here
# ...
device_connection.disconnect().result()
# Subscribe to provisioning responses
identity_client.subscribe_to_create_keys_and_certificate_accepted(
iotidentity.CreateKeysAndCertificateSubscriptionRequest(),
mqtt.QoS.AT_LEAST_ONCE,
on_create_keys_accepted
).result()
identity_client.subscribe_to_create_keys_and_certificate_rejected(
iotidentity.CreateKeysAndCertificateSubscriptionRequest(),
mqtt.QoS.AT_LEAST_ONCE,
on_create_keys_rejected
).result()
identity_client.subscribe_to_register_thing_accepted(
iotidentity.RegisterThingSubscriptionRequest(template_name="MyFleetProvisioningTemplate"),
mqtt.QoS.AT_LEAST_ONCE,
on_register_thing_accepted
).result()
identity_client.subscribe_to_register_thing_rejected(
iotidentity.RegisterThingSubscriptionRequest(template_name="MyFleetProvisioningTemplate"),
mqtt.QoS.AT_LEAST_ONCE,
on_register_thing_rejected
).result()
# Step 1: Request keys and certificate creation
create_keys_request = iotidentity.CreateKeysAndCertificateRequest()
identity_client.publish_create_keys_and_certificate(create_keys_request, mqtt.QoS.AT_LEAST_ONCE)
print("Fleet provisioning started. Waiting for responses...")from awsiot import mqtt_connection_builder, iotidentity
from awscrt import mqtt
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
def generate_csr():
"""Generate a Certificate Signing Request."""
# Generate private key
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048
)
# Create CSR
subject = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
x509.NameAttribute(NameOID.LOCALITY_NAME, "Seattle"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "MyCompany"),
x509.NameAttribute(NameOID.COMMON_NAME, "my-iot-device"),
])
csr = x509.CertificateSigningRequestBuilder().subject_name(
subject
).sign(private_key, hashes.SHA256())
# Serialize CSR and private key
csr_pem = csr.public_bytes(serialization.Encoding.PEM).decode()
private_key_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode()
return csr_pem, private_key_pem
# Generate CSR and private key
csr_pem, device_private_key = generate_csr()
# Save private key (you'll need this with the certificate)
with open("/tmp/device-private.pem.key", "w") as f:
f.write(device_private_key)
# Connect with provisioning claim certificate
connection = mqtt_connection_builder.mtls_from_path(
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
cert_filepath="/path/to/provisioning-claim.pem.crt",
pri_key_filepath="/path/to/provisioning-claim.pem.key",
client_id="csr-provisioning-123"
)
identity_client = iotidentity.IotIdentityClient(connection)
connection.connect().result()
def on_create_cert_from_csr_accepted(response):
"""Handle successful certificate creation from CSR."""
print("Certificate created from CSR successfully!")
# Save certificate
with open("/tmp/device-certificate.pem.crt", "w") as f:
f.write(response.certificate_pem)
print(f"Certificate ID: {response.certificate_id}")
print(f"Certificate ARN: {response.certificate_arn}")
# Register thing with certificate ownership token
register_request = iotidentity.RegisterThingRequest(
template_name="MyFleetProvisioningTemplate",
certificate_ownership_token=response.certificate_ownership_token,
parameters={"SerialNumber": "ABC123456"}
)
identity_client.publish_register_thing(register_request, mqtt.QoS.AT_LEAST_ONCE)
def on_create_cert_from_csr_rejected(error):
"""Handle certificate creation rejection."""
print(f"Certificate creation from CSR rejected: {error}")
# Subscribe to responses
identity_client.subscribe_to_create_certificate_from_csr_accepted(
iotidentity.CreateCertificateFromCsrSubscriptionRequest(),
mqtt.QoS.AT_LEAST_ONCE,
on_create_cert_from_csr_accepted
).result()
identity_client.subscribe_to_create_certificate_from_csr_rejected(
iotidentity.CreateCertificateFromCsrSubscriptionRequest(),
mqtt.QoS.AT_LEAST_ONCE,
on_create_cert_from_csr_rejected
).result()
# Request certificate creation from CSR
csr_request = iotidentity.CreateCertificateFromCsrRequest(
certificate_signing_request=csr_pem
)
identity_client.publish_create_certificate_from_csr(csr_request, mqtt.QoS.AT_LEAST_ONCE)
print("CSR provisioning started. Waiting for certificate...")from awsiot import mqtt_connection_builder, iotidentity
import asyncio
async def fleet_provisioning_v2():
# Create connection with provisioning claim certificate
connection = mqtt_connection_builder.mtls_from_path(
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
cert_filepath="/path/to/provisioning-claim.pem.crt",
pri_key_filepath="/path/to/provisioning-claim.pem.key",
client_id="fleet-v2-client"
)
# Create V2 identity client
identity_client = iotidentity.IotIdentityClientV2(connection)
await connection.connect()
try:
# Step 1: Create keys and certificate
print("Creating keys and certificate...")
keys_request = iotidentity.CreateKeysAndCertificateRequest()
keys_response = await identity_client.create_keys_and_certificate(keys_request)
print(f"Certificate created: {keys_response.certificate_id}")
# Save certificate and private key
with open("/tmp/device-certificate.pem.crt", "w") as f:
f.write(keys_response.certificate_pem)
with open("/tmp/device-private.pem.key", "w") as f:
f.write(keys_response.private_key)
# Step 2: Register thing
print("Registering thing...")
register_request = iotidentity.RegisterThingRequest(
template_name="MyFleetProvisioningTemplate",
certificate_ownership_token=keys_response.certificate_ownership_token,
parameters={
"DeviceSerial": "DEV123456789",
"DeviceModel": "TempSensor-v2"
}
)
register_response = await identity_client.register_thing(register_request)
print(f"Thing registered: {register_response.thing_name}")
print(f"Device configuration: {register_response.device_configuration}")
# Save device configuration
if register_response.device_configuration:
import json
with open("/tmp/device-config.json", "w") as f:
json.dump(register_response.device_configuration, f, indent=2)
print("Fleet provisioning completed successfully!")
return {
"thing_name": register_response.thing_name,
"certificate_path": "/tmp/device-certificate.pem.crt",
"private_key_path": "/tmp/device-private.pem.key",
"device_config": register_response.device_configuration
}
except iotidentity.V2ServiceException as e:
print(f"Provisioning failed: {e.message}")
if e.modeled_error:
print(f"Error details: {e.modeled_error}")
return None
finally:
await connection.disconnect()
# Run async provisioning
provisioning_result = asyncio.run(fleet_provisioning_v2())
if provisioning_result:
print(f"Device {provisioning_result['thing_name']} provisioned successfully!")
# Connect with new device certificate
device_connection = mqtt_connection_builder.mtls_from_path(
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
cert_filepath=provisioning_result["certificate_path"],
pri_key_filepath=provisioning_result["private_key_path"],
client_id=provisioning_result["thing_name"]
)
device_connection.connect().result()
print("Device connected with new certificate!")
device_connection.disconnect().result()Before using fleet provisioning, you need to create a provisioning template in AWS IoT Core:
{
"templateName": "MyFleetProvisioningTemplate",
"description": "Template for provisioning temperature sensors",
"templateBody": {
"Parameters": {
"DeviceLocation": {
"Type": "String"
},
"DeviceType": {
"Type": "String"
},
"SerialNumber": {
"Type": "String"
}
},
"Resources": {
"certificate": {
"Properties": {
"CertificateId": {
"Ref": "AWS::IoT::Certificate::Id"
},
"Status": "Active"
},
"Type": "AWS::IoT::Certificate"
},
"policy": {
"Properties": {
"PolicyName": "TemperatureSensorPolicy"
},
"Type": "AWS::IoT::Policy"
},
"thing": {
"OverrideSettings": {
"AttributePayload": "MERGE",
"ThingGroups": "DO_NOTHING",
"ThingTypeName": "REPLACE"
},
"Properties": {
"AttributePayload": {
"location": {
"Ref": "DeviceLocation"
},
"deviceType": {
"Ref": "DeviceType"
},
"serialNumber": {
"Ref": "SerialNumber"
}
},
"ThingName": {
"Fn::Join": [
"",
[
"TempSensor-",
{
"Ref": "SerialNumber"
}
]
]
},
"ThingTypeName": "TemperatureSensor"
},
"Type": "AWS::IoT::Thing"
}
}
},
"enabled": true,
"provisioningRoleArn": "arn:aws:iam::123456789012:role/IoTFleetProvisioningRole"
}Install with Tessl CLI
npx tessl i tessl/pypi-awsiotsdk