CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-awsiotsdk

AWS IoT SDK based on the AWS Common Runtime for connecting IoT devices to AWS IoT Core services

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

fleet-provisioning.mddocs/

Fleet Provisioning

Device certificate provisioning and thing registration capabilities for automated fleet provisioning workflows including certificate creation from CSR, keys and certificate generation, and device thing registration through both V1 (callback-based) and V2 (Future-based) client interfaces.

Capabilities

V1 Identity Client (Callback-Based)

The V1 client provides callback-based operations following the traditional publish/subscribe pattern.

Client Creation

class IotIdentityClient(MqttServiceClient):
    """
    V1 client for AWS IoT Identity/Fleet Provisioning service with callback-based operations.
    
    Parameters:
    - mqtt_connection: MQTT connection (mqtt.Connection or mqtt5.Client)
    """
    def __init__(self, mqtt_connection): ...

Usage example:

from awsiot import mqtt_connection_builder, iotidentity

# Create MQTT connection with provisioning claim certificates
connection = mqtt_connection_builder.mtls_from_path(
    endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
    cert_filepath="/path/to/provisioning-claim.pem.crt",
    pri_key_filepath="/path/to/provisioning-claim.pem.key",
    client_id="provisioning-client-123"
)

# Create identity client
identity_client = iotidentity.IotIdentityClient(connection)

Certificate Operations

Create Certificate from CSR
def publish_create_certificate_from_csr(self, request, qos):
    """
    Publish request to create certificate from Certificate Signing Request.
    
    Parameters:
    - request (CreateCertificateFromCsrRequest): CSR certificate request
    - qos (awscrt.mqtt.QoS): Quality of service
    
    Returns:
    Future: Future that completes when request is published
    """

def subscribe_to_create_certificate_from_csr_accepted(self, request, qos, callback):
    """
    Subscribe to successful create certificate from CSR responses.
    
    Parameters:
    - request (CreateCertificateFromCsrSubscriptionRequest): Subscription request
    - qos (awscrt.mqtt.QoS): Quality of service
    - callback: Function called with certificate response
    
    Returns:
    Tuple[Future, str]: Future for subscription, topic string
    """

def subscribe_to_create_certificate_from_csr_rejected(self, request, qos, callback):
    """
    Subscribe to rejected create certificate from CSR responses.
    
    Parameters:
    - request (CreateCertificateFromCsrSubscriptionRequest): Subscription request
    - qos (awscrt.mqtt.QoS): Quality of service
    - callback: Function called when request is rejected
    
    Returns:
    Tuple[Future, str]: Future for subscription, topic string
    """
Create Keys and Certificate
def publish_create_keys_and_certificate(self, request, qos):
    """
    Publish request to create new private key and certificate.
    
    Parameters:
    - request (CreateKeysAndCertificateRequest): Keys and certificate request
    - qos (awscrt.mqtt.QoS): Quality of service
    
    Returns:
    Future: Future that completes when request is published
    """

def subscribe_to_create_keys_and_certificate_accepted(self, request, qos, callback):
    """
    Subscribe to successful create keys and certificate responses.
    
    Parameters:
    - request (CreateKeysAndCertificateSubscriptionRequest): Subscription request
    - qos (awscrt.mqtt.QoS): Quality of service
    - callback: Function called with keys and certificate response
    
    Returns:
    Tuple[Future, str]: Future for subscription, topic string
    """

def subscribe_to_create_keys_and_certificate_rejected(self, request, qos, callback):
    """
    Subscribe to rejected create keys and certificate responses.
    """
Register Thing
def publish_register_thing(self, request, qos):
    """
    Publish request to register device thing with provisioning template.
    
    Parameters:
    - request (RegisterThingRequest): Thing registration request
    - qos (awscrt.mqtt.QoS): Quality of service
    
    Returns:
    Future: Future that completes when request is published
    """

def subscribe_to_register_thing_accepted(self, request, qos, callback):
    """
    Subscribe to successful register thing responses.
    
    Parameters:
    - request (RegisterThingSubscriptionRequest): Subscription request
    - qos (awscrt.mqtt.QoS): Quality of service
    - callback: Function called with thing registration response
    
    Returns:
    Tuple[Future, str]: Future for subscription, topic string
    """

def subscribe_to_register_thing_rejected(self, request, qos, callback):
    """
    Subscribe to rejected register thing responses.
    """

V2 Identity Client (Future-Based)

The V2 client provides Future-based operations with request-response semantics.

Client Creation

class IotIdentityClientV2:
    """
    V2 client for AWS IoT Identity/Fleet Provisioning service with Future-based operations.
    """
    def __init__(self, connection): ...

Certificate Operations

def create_certificate_from_csr(self, request):
    """
    Create certificate from CSR using request-response pattern.
    
    Parameters:
    - request (CreateCertificateFromCsrRequest): CSR certificate request
    
    Returns:
    Future[CreateCertificateFromCsrResponse]: Future containing certificate response
    """

def create_keys_and_certificate(self, request):
    """
    Create private key and certificate using request-response pattern.
    
    Parameters:
    - request (CreateKeysAndCertificateRequest): Keys and certificate request
    
    Returns:
    Future[CreateKeysAndCertificateResponse]: Future containing keys and certificate
    """

def register_thing(self, request):
    """
    Register device thing using request-response pattern.
    
    Parameters:
    - request (RegisterThingRequest): Thing registration request
    
    Returns:
    Future[RegisterThingResponse]: Future containing registration response
    """

Data Model Classes

Request Classes

@dataclass
class CreateCertificateFromCsrRequest:
    """Request to create certificate from CSR."""
    certificate_signing_request: str

@dataclass
class CreateKeysAndCertificateRequest:
    """Request to create private key and certificate."""
    pass  # No additional parameters required

@dataclass
class RegisterThingRequest:
    """Request to register device thing."""
    template_name: str
    certificate_ownership_token: str
    parameters: Optional[Dict[str, str]] = None

Response Classes

@dataclass
class CreateCertificateFromCsrResponse:
    """Response from create certificate from CSR operation."""
    certificate_pem: Optional[str] = None
    certificate_arn: Optional[str] = None
    certificate_id: Optional[str] = None
    certificate_ownership_token: Optional[str] = None

@dataclass
class CreateKeysAndCertificateResponse:
    """Response from create keys and certificate operation."""
    certificate_pem: Optional[str] = None
    private_key: Optional[str] = None
    certificate_arn: Optional[str] = None
    certificate_id: Optional[str] = None
    certificate_ownership_token: Optional[str] = None

@dataclass
class RegisterThingResponse:
    """Response from register thing operation."""
    device_configuration: Optional[Dict[str, str]] = None
    thing_name: Optional[str] = None

Subscription Request Classes

@dataclass
class CreateCertificateFromCsrSubscriptionRequest:
    """Subscription request for create certificate from CSR responses."""
    pass

@dataclass
class CreateKeysAndCertificateSubscriptionRequest:
    """Subscription request for create keys and certificate responses."""
    pass

@dataclass
class RegisterThingSubscriptionRequest:
    """Subscription request for register thing responses."""
    template_name: str

Error Response Classes

@dataclass
class ErrorResponse:
    """Error response from identity operations."""
    error_code: Optional[str] = None
    error_message: Optional[str] = None
    status_code: Optional[int] = None

Usage Examples

V1 Client - Complete Fleet Provisioning Workflow

from awsiot import mqtt_connection_builder, iotidentity
from awscrt import mqtt
import json
import os

# Step 1: Connect with provisioning claim certificate
connection = mqtt_connection_builder.mtls_from_path(
    endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
    cert_filepath="/path/to/provisioning-claim.pem.crt",
    pri_key_filepath="/path/to/provisioning-claim.pem.key",
    client_id="fleet-provisioning-123"
)

identity_client = iotidentity.IotIdentityClient(connection)
connection.connect().result()

# Global variables to store provisioning results
device_certificate = None
device_private_key = None
thing_name = None
certificate_ownership_token = None

def on_create_keys_accepted(response):
    """Handle successful keys and certificate creation."""
    global device_certificate, device_private_key, certificate_ownership_token
    
    print("Keys and certificate created successfully!")
    device_certificate = response.certificate_pem
    device_private_key = response.private_key
    certificate_ownership_token = response.certificate_ownership_token
    
    # Save certificate and key to files
    with open("/tmp/device-certificate.pem.crt", "w") as f:
        f.write(device_certificate)
    with open("/tmp/device-private.pem.key", "w") as f:
        f.write(device_private_key)
    
    print(f"Certificate ID: {response.certificate_id}")
    print(f"Certificate ARN: {response.certificate_arn}")
    
    # Step 2: Register the thing using the certificate ownership token
    register_request = iotidentity.RegisterThingRequest(
        template_name="MyFleetProvisioningTemplate",
        certificate_ownership_token=certificate_ownership_token,
        parameters={
            "DeviceLocation": "Building-A-Floor-2",
            "DeviceType": "TemperatureSensor"
        }
    )
    identity_client.publish_register_thing(register_request, mqtt.QoS.AT_LEAST_ONCE)

def on_create_keys_rejected(error):
    """Handle keys and certificate creation rejection."""
    print(f"Keys and certificate creation rejected: {error}")

def on_register_thing_accepted(response):
    """Handle successful thing registration."""
    global thing_name
    
    print("Thing registered successfully!")
    thing_name = response.thing_name
    
    print(f"Thing name: {thing_name}")
    print(f"Device configuration: {response.device_configuration}")
    
    # Save device configuration
    if response.device_configuration:
        with open("/tmp/device-config.json", "w") as f:
            json.dump(response.device_configuration, f, indent=2)
    
    print("Fleet provisioning completed successfully!")
    
    # Now you can disconnect from the provisioning endpoint
    # and connect using the new device certificate
    setup_device_connection()

def on_register_thing_rejected(error):
    """Handle thing registration rejection."""
    print(f"Thing registration rejected: {error}")

def setup_device_connection():
    """Setup permanent device connection with new certificate."""
    print("Setting up device connection with new certificate...")
    
    # Disconnect from provisioning connection
    connection.disconnect().result()
    
    # Create new connection with device certificate
    device_connection = mqtt_connection_builder.mtls_from_path(
        endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
        cert_filepath="/tmp/device-certificate.pem.crt",
        pri_key_filepath="/tmp/device-private.pem.key",
        client_id=thing_name,
        clean_session=False,
        keep_alive_secs=30
    )
    
    device_connection.connect().result()
    print(f"Device {thing_name} connected successfully!")
    
    # Your device application logic starts here
    # ...
    
    device_connection.disconnect().result()

# Subscribe to provisioning responses
identity_client.subscribe_to_create_keys_and_certificate_accepted(
    iotidentity.CreateKeysAndCertificateSubscriptionRequest(),
    mqtt.QoS.AT_LEAST_ONCE,
    on_create_keys_accepted
).result()

identity_client.subscribe_to_create_keys_and_certificate_rejected(
    iotidentity.CreateKeysAndCertificateSubscriptionRequest(),
    mqtt.QoS.AT_LEAST_ONCE,
    on_create_keys_rejected
).result()

identity_client.subscribe_to_register_thing_accepted(
    iotidentity.RegisterThingSubscriptionRequest(template_name="MyFleetProvisioningTemplate"),
    mqtt.QoS.AT_LEAST_ONCE,
    on_register_thing_accepted
).result()

identity_client.subscribe_to_register_thing_rejected(
    iotidentity.RegisterThingSubscriptionRequest(template_name="MyFleetProvisioningTemplate"),
    mqtt.QoS.AT_LEAST_ONCE,
    on_register_thing_rejected
).result()

# Step 1: Request keys and certificate creation
create_keys_request = iotidentity.CreateKeysAndCertificateRequest()
identity_client.publish_create_keys_and_certificate(create_keys_request, mqtt.QoS.AT_LEAST_ONCE)

print("Fleet provisioning started. Waiting for responses...")

V1 Client - Certificate from CSR Workflow

from awsiot import mqtt_connection_builder, iotidentity
from awscrt import mqtt
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa

def generate_csr():
    """Generate a Certificate Signing Request."""
    # Generate private key
    private_key = rsa.generate_private_key(
        public_exponent=65537,
        key_size=2048
    )
    
    # Create CSR
    subject = x509.Name([
        x509.NameAttribute(NameOID.COUNTRY_NAME, "US"),
        x509.NameAttribute(NameOID.LOCALITY_NAME, "Seattle"),
        x509.NameAttribute(NameOID.ORGANIZATION_NAME, "MyCompany"),
        x509.NameAttribute(NameOID.COMMON_NAME, "my-iot-device"),
    ])
    
    csr = x509.CertificateSigningRequestBuilder().subject_name(
        subject
    ).sign(private_key, hashes.SHA256())
    
    # Serialize CSR and private key
    csr_pem = csr.public_bytes(serialization.Encoding.PEM).decode()
    private_key_pem = private_key.private_bytes(
        encoding=serialization.Encoding.PEM,
        format=serialization.PrivateFormat.PKCS8,
        encryption_algorithm=serialization.NoEncryption()
    ).decode()
    
    return csr_pem, private_key_pem

# Generate CSR and private key
csr_pem, device_private_key = generate_csr()

# Save private key (you'll need this with the certificate)
with open("/tmp/device-private.pem.key", "w") as f:
    f.write(device_private_key)

# Connect with provisioning claim certificate
connection = mqtt_connection_builder.mtls_from_path(
    endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
    cert_filepath="/path/to/provisioning-claim.pem.crt",
    pri_key_filepath="/path/to/provisioning-claim.pem.key",
    client_id="csr-provisioning-123"
)

identity_client = iotidentity.IotIdentityClient(connection)
connection.connect().result()

def on_create_cert_from_csr_accepted(response):
    """Handle successful certificate creation from CSR."""
    print("Certificate created from CSR successfully!")
    
    # Save certificate
    with open("/tmp/device-certificate.pem.crt", "w") as f:
        f.write(response.certificate_pem)
    
    print(f"Certificate ID: {response.certificate_id}")
    print(f"Certificate ARN: {response.certificate_arn}")
    
    # Register thing with certificate ownership token
    register_request = iotidentity.RegisterThingRequest(
        template_name="MyFleetProvisioningTemplate",
        certificate_ownership_token=response.certificate_ownership_token,
        parameters={"SerialNumber": "ABC123456"}
    )
    identity_client.publish_register_thing(register_request, mqtt.QoS.AT_LEAST_ONCE)

def on_create_cert_from_csr_rejected(error):
    """Handle certificate creation rejection."""
    print(f"Certificate creation from CSR rejected: {error}")

# Subscribe to responses
identity_client.subscribe_to_create_certificate_from_csr_accepted(
    iotidentity.CreateCertificateFromCsrSubscriptionRequest(),
    mqtt.QoS.AT_LEAST_ONCE,
    on_create_cert_from_csr_accepted
).result()

identity_client.subscribe_to_create_certificate_from_csr_rejected(
    iotidentity.CreateCertificateFromCsrSubscriptionRequest(),
    mqtt.QoS.AT_LEAST_ONCE,
    on_create_cert_from_csr_rejected
).result()

# Request certificate creation from CSR
csr_request = iotidentity.CreateCertificateFromCsrRequest(
    certificate_signing_request=csr_pem
)
identity_client.publish_create_certificate_from_csr(csr_request, mqtt.QoS.AT_LEAST_ONCE)

print("CSR provisioning started. Waiting for certificate...")

V2 Client - Request-Response Pattern

from awsiot import mqtt_connection_builder, iotidentity
import asyncio

async def fleet_provisioning_v2():
    # Create connection with provisioning claim certificate
    connection = mqtt_connection_builder.mtls_from_path(
        endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
        cert_filepath="/path/to/provisioning-claim.pem.crt",
        pri_key_filepath="/path/to/provisioning-claim.pem.key",
        client_id="fleet-v2-client"
    )
    
    # Create V2 identity client
    identity_client = iotidentity.IotIdentityClientV2(connection)
    await connection.connect()
    
    try:
        # Step 1: Create keys and certificate
        print("Creating keys and certificate...")
        keys_request = iotidentity.CreateKeysAndCertificateRequest()
        keys_response = await identity_client.create_keys_and_certificate(keys_request)
        
        print(f"Certificate created: {keys_response.certificate_id}")
        
        # Save certificate and private key
        with open("/tmp/device-certificate.pem.crt", "w") as f:
            f.write(keys_response.certificate_pem)
        with open("/tmp/device-private.pem.key", "w") as f:
            f.write(keys_response.private_key)
        
        # Step 2: Register thing
        print("Registering thing...")
        register_request = iotidentity.RegisterThingRequest(
            template_name="MyFleetProvisioningTemplate",
            certificate_ownership_token=keys_response.certificate_ownership_token,
            parameters={
                "DeviceSerial": "DEV123456789",
                "DeviceModel": "TempSensor-v2"
            }
        )
        
        register_response = await identity_client.register_thing(register_request)
        
        print(f"Thing registered: {register_response.thing_name}")
        print(f"Device configuration: {register_response.device_configuration}")
        
        # Save device configuration
        if register_response.device_configuration:
            import json
            with open("/tmp/device-config.json", "w") as f:
                json.dump(register_response.device_configuration, f, indent=2)
        
        print("Fleet provisioning completed successfully!")
        
        return {
            "thing_name": register_response.thing_name,
            "certificate_path": "/tmp/device-certificate.pem.crt",
            "private_key_path": "/tmp/device-private.pem.key",
            "device_config": register_response.device_configuration
        }
        
    except iotidentity.V2ServiceException as e:
        print(f"Provisioning failed: {e.message}")
        if e.modeled_error:
            print(f"Error details: {e.modeled_error}")
        return None
    
    finally:
        await connection.disconnect()

# Run async provisioning
provisioning_result = asyncio.run(fleet_provisioning_v2())

if provisioning_result:
    print(f"Device {provisioning_result['thing_name']} provisioned successfully!")
    
    # Connect with new device certificate
    device_connection = mqtt_connection_builder.mtls_from_path(
        endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
        cert_filepath=provisioning_result["certificate_path"],
        pri_key_filepath=provisioning_result["private_key_path"],
        client_id=provisioning_result["thing_name"]
    )
    
    device_connection.connect().result()
    print("Device connected with new certificate!")
    device_connection.disconnect().result()

Provisioning Template Setup

Before using fleet provisioning, you need to create a provisioning template in AWS IoT Core:

{
  "templateName": "MyFleetProvisioningTemplate",
  "description": "Template for provisioning temperature sensors",
  "templateBody": {
    "Parameters": {
      "DeviceLocation": {
        "Type": "String"
      },
      "DeviceType": {
        "Type": "String" 
      },
      "SerialNumber": {
        "Type": "String"
      }
    },
    "Resources": {
      "certificate": {
        "Properties": {
          "CertificateId": {
            "Ref": "AWS::IoT::Certificate::Id"
          },
          "Status": "Active"
        },
        "Type": "AWS::IoT::Certificate"
      },
      "policy": {
        "Properties": {
          "PolicyName": "TemperatureSensorPolicy"
        },
        "Type": "AWS::IoT::Policy"
      },
      "thing": {
        "OverrideSettings": {
          "AttributePayload": "MERGE",
          "ThingGroups": "DO_NOTHING",
          "ThingTypeName": "REPLACE"
        },
        "Properties": {
          "AttributePayload": {
            "location": {
              "Ref": "DeviceLocation"
            },
            "deviceType": {
              "Ref": "DeviceType"  
            },
            "serialNumber": {
              "Ref": "SerialNumber"
            }
          },
          "ThingName": {
            "Fn::Join": [
              "",
              [
                "TempSensor-",
                {
                  "Ref": "SerialNumber"
                }
              ]
            ]
          },
          "ThingTypeName": "TemperatureSensor"
        },
        "Type": "AWS::IoT::Thing"
      }
    }
  },
  "enabled": true,
  "provisioningRoleArn": "arn:aws:iam::123456789012:role/IoTFleetProvisioningRole"
}

Install with Tessl CLI

npx tessl i tessl/pypi-awsiotsdk

docs

device-shadow.md

fleet-provisioning.md

greengrass-discovery.md

greengrass-ipc.md

index.md

iot-jobs.md

mqtt-connections.md

tile.json