Microsoft Azure IoT Hub Management Client Library for programmatic management of Azure IoT Hub resources through the Azure Resource Manager API
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive security management including X.509 certificate operations, shared access signature key management, and certificate verification workflows for device authentication. This module enables secure device-to-cloud and cloud-to-device communications through multiple authentication mechanisms.
Management of shared access signature authorization rules that control access to IoT Hub operations with granular permission control.
def list_keys(resource_group_name: str, resource_name: str, **kwargs) -> ItemPaged[SharedAccessSignatureAuthorizationRule]:
"""
Get security metadata for IoT hub including all shared access policies.
Args:
resource_group_name: Name of the resource group
resource_name: Name of the IoT hub resource
Returns:
ItemPaged[SharedAccessSignatureAuthorizationRule]: All access policies with keys and permissions
"""
def get_keys_for_key_name(
resource_group_name: str,
resource_name: str,
key_name: str,
**kwargs
) -> SharedAccessSignatureAuthorizationRule:
"""
Get shared access policy by name from IoT hub.
Args:
resource_group_name: Name of the resource group
resource_name: Name of the IoT hub resource
key_name: Name of the access policy
Returns:
SharedAccessSignatureAuthorizationRule: Access policy with keys and permissions
"""Comprehensive X.509 certificate lifecycle management for device authentication including upload, verification, and deletion operations.
def list_by_iot_hub(resource_group_name: str, resource_name: str, **kwargs) -> CertificateListDescription:
"""
Get list of certificates for IoT hub.
Args:
resource_group_name: Name of the resource group
resource_name: Name of the IoT hub resource
Returns:
CertificateListDescription: List of all certificates with metadata
"""
def get(
resource_group_name: str,
resource_name: str,
certificate_name: str,
**kwargs
) -> CertificateDescription:
"""
Get specific certificate details.
Args:
resource_group_name: Name of the resource group
resource_name: Name of the IoT hub resource
certificate_name: Name of the certificate
Returns:
CertificateDescription: Certificate details and metadata
"""
def create_or_update(
resource_group_name: str,
resource_name: str,
certificate_name: str,
certificate_description: CertificateDescription,
if_match: Optional[str] = None,
**kwargs
) -> CertificateDescription:
"""
Upload certificate to IoT hub - add new or replace existing certificate.
Args:
resource_group_name: Name of the resource group
resource_name: Name of the IoT hub resource
certificate_name: Name for the certificate
certificate_description: Certificate data and configuration
if_match: ETag for conditional updates (optional)
Returns:
CertificateDescription: Uploaded certificate details
"""
def delete(
resource_group_name: str,
resource_name: str,
certificate_name: str,
if_match: str,
**kwargs
) -> None:
"""
Delete X509 certificate.
Args:
resource_group_name: Name of the resource group
resource_name: Name of the IoT hub resource
certificate_name: Name of the certificate to delete
if_match: ETag for conditional deletion (required)
"""Proof of possession verification workflow for X.509 certificates to ensure certificate ownership before enabling device authentication.
def generate_verification_code(
resource_group_name: str,
resource_name: str,
certificate_name: str,
if_match: str,
**kwargs
) -> CertificateWithNonceDescription:
"""
Generate verification code for proof of possession flow.
Args:
resource_group_name: Name of the resource group
resource_name: Name of the IoT hub resource
certificate_name: Name of the certificate
if_match: ETag for conditional operation (required)
Returns:
CertificateWithNonceDescription: Certificate details with verification code
"""
def verify(
resource_group_name: str,
resource_name: str,
certificate_name: str,
if_match: str,
certificate_verification_body: CertificateVerificationDescription,
**kwargs
) -> CertificateDescription:
"""
Verify certificate's private key possession.
Args:
resource_group_name: Name of the resource group
resource_name: Name of the IoT hub resource
certificate_name: Name of the certificate
if_match: ETag for conditional operation (required)
certificate_verification_body: Verification certificate signed with private key
Returns:
CertificateDescription: Verified certificate details
"""from azure.identity import DefaultAzureCredential
from azure.mgmt.iothub import IotHubClient
credential = DefaultAzureCredential()
client = IotHubClient(credential, "subscription-id")
# List all access policies
policies = list(client.iot_hub_resource.list_keys("my-resource-group", "my-iot-hub"))
for policy in policies:
print(f"Policy: {policy.key_name}")
print(f" Rights: {policy.rights}")
print(f" Primary Key: {policy.primary_key[:10]}...")
print(f" Secondary Key: {policy.secondary_key[:10]}...")
# Get specific policy
service_policy = client.iot_hub_resource.get_keys_for_key_name(
"my-resource-group",
"my-iot-hub",
"service"
)
print(f"Service policy has rights: {service_policy.rights}")from azure.mgmt.iothub.models import CertificateDescription, CertificateBodyDescription
# Read certificate from file
with open("device-ca.pem", "r") as cert_file:
cert_content = cert_file.read()
# Create certificate description
cert_body = CertificateBodyDescription(certificate=cert_content)
cert_description = CertificateDescription(properties=cert_body)
# Upload certificate
uploaded_cert = client.certificates.create_or_update(
"my-resource-group",
"my-iot-hub",
"device-ca-cert",
cert_description
)
print(f"Uploaded certificate: {uploaded_cert.name}")
print(f"Subject: {uploaded_cert.properties.subject}")
print(f"Thumbprint: {uploaded_cert.properties.thumbprint}")
print(f"Is Verified: {uploaded_cert.properties.is_verified}")from azure.mgmt.iothub.models import CertificateVerificationDescription
# Step 1: Generate verification code
verification_cert = client.certificates.generate_verification_code(
"my-resource-group",
"my-iot-hub",
"device-ca-cert",
uploaded_cert.etag
)
print(f"Verification code: {verification_cert.properties.verification_code}")
# Step 2: Create leaf certificate signed with private key
# (This step happens outside of Azure - you create a leaf certificate
# with the verification code as the subject, signed with your private key)
# Step 3: Upload verification certificate
with open("verification-cert.pem", "r") as verify_file:
verify_cert_content = verify_file.read()
verification_body = CertificateVerificationDescription(
certificate=verify_cert_content
)
# Step 4: Complete verification
verified_cert = client.certificates.verify(
"my-resource-group",
"my-iot-hub",
"device-ca-cert",
verification_cert.etag,
verification_body
)
print(f"Certificate verified: {verified_cert.properties.is_verified}")# List all certificates
cert_list = client.certificates.list_by_iot_hub("my-resource-group", "my-iot-hub")
for cert in cert_list.value:
print(f"Certificate: {cert.name}")
print(f" Subject: {cert.properties.subject}")
print(f" Expiry: {cert.properties.expiry_time_utc}")
print(f" Verified: {cert.properties.is_verified}")
# Check if certificate is expiring soon
if cert.properties.expiry_time_utc:
days_until_expiry = (cert.properties.expiry_time_utc - datetime.utcnow()).days
if days_until_expiry < 30:
print(f" WARNING: Certificate expires in {days_until_expiry} days!")
# Delete expired certificates
expired_cert = client.certificates.get("my-resource-group", "my-iot-hub", "old-cert")
if expired_cert.properties.expiry_time_utc < datetime.utcnow():
client.certificates.delete(
"my-resource-group",
"my-iot-hub",
"old-cert",
expired_cert.etag
)
print("Deleted expired certificate")class SharedAccessSignatureAuthorizationRule:
"""
Shared access signature authorization rule.
Attributes:
key_name: Policy name (required)
primary_key: Primary access key
secondary_key: Secondary access key
rights: Access permissions (required)
"""
key_name: str
primary_key: Optional[str]
secondary_key: Optional[str]
rights: AccessRights
class CertificateDescription:
"""
X.509 certificate representation.
Attributes:
properties: Certificate properties and metadata
name: Certificate name (readonly)
etag: ETag for concurrency control (readonly)
type: Resource type (readonly)
"""
properties: Optional[CertificateProperties]
name: Optional[str]
etag: Optional[str]
type: Optional[str]
class CertificateProperties:
"""
X.509 certificate properties.
Attributes:
subject: Certificate subject (readonly)
expiry_time_utc: Certificate expiration time (readonly)
thumbprint: Certificate thumbprint (readonly)
is_verified: Whether certificate ownership is verified (readonly)
created: Certificate creation time (readonly)
updated: Certificate last update time (readonly)
certificate: Certificate content for upload
"""
subject: Optional[str]
expiry_time_utc: Optional[datetime]
thumbprint: Optional[str]
is_verified: Optional[bool]
created: Optional[datetime]
updated: Optional[datetime]
certificate: Optional[str]
class CertificateBodyDescription:
"""
Certificate upload payload.
Attributes:
certificate: Base64 encoded certificate content (required)
"""
certificate: str
class CertificateWithNonceDescription:
"""
Certificate with verification code.
Attributes:
properties: Certificate properties including verification code
name: Certificate name (readonly)
etag: ETag for concurrency control (readonly)
type: Resource type (readonly)
"""
properties: Optional[CertificatePropertiesWithNonce]
name: Optional[str]
etag: Optional[str]
type: Optional[str]
class CertificatePropertiesWithNonce:
"""
Certificate properties with verification code.
Attributes:
subject: Certificate subject (readonly)
expiry_time_utc: Certificate expiration time (readonly)
thumbprint: Certificate thumbprint (readonly)
is_verified: Whether certificate ownership is verified (readonly)
created: Certificate creation time (readonly)
updated: Certificate last update time (readonly)
verification_code: Code for proof of possession verification (readonly)
certificate: Certificate content
"""
subject: Optional[str]
expiry_time_utc: Optional[datetime]
thumbprint: Optional[str]
is_verified: Optional[bool]
created: Optional[datetime]
updated: Optional[datetime]
verification_code: Optional[str]
certificate: Optional[str]
class CertificateVerificationDescription:
"""
Certificate verification payload.
Attributes:
certificate: Verification certificate signed with private key (required)
"""
certificate: str
class CertificateListDescription:
"""
List of certificates.
Attributes:
value: Array of certificate descriptions
"""
value: Optional[List[CertificateDescription]]Install with Tessl CLI
npx tessl i tessl/pypi-azure-mgmt-iothub