SDK for connecting to AWS IoT using Python.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Device shadow operations for synchronizing device state between physical devices and the cloud. Provides shadow retrieval, updates, deletion, and delta notifications for state changes. Device shadows are JSON documents that store and retrieve current state information for a device.
Create a specialized MQTT client for device shadow operations with automatic subscription management and optimized for time-sensitive shadow messages.
class AWSIoTMQTTShadowClient:
def __init__(self, clientID: str, protocolType: int = MQTTv3_1_1, useWebsocket: bool = False, cleanSession: bool = True, awsIoTMQTTClient = None):
"""
Create AWS IoT MQTT Shadow client.
Args:
clientID (str): Client identifier for MQTT connection
protocolType (int): MQTT version (MQTTv3_1=3, MQTTv3_1_1=4)
useWebsocket (bool): Enable MQTT over WebSocket SigV4
cleanSession (bool): Start with clean session state
awsIoTMQTTClient (AWSIoTMQTTClient): Existing MQTT client to reuse (optional)
"""Create individual shadow handlers for specific devices with configurable subscription persistence.
def createShadowHandlerWithName(self, shadowName: str, isPersistentSubscribe: bool) -> 'deviceShadow':
"""
Create device shadow handler for named shadow.
Args:
shadowName (str): Name of the device shadow
isPersistentSubscribe (bool): Whether to maintain persistent subscriptions to shadow topics
Returns:
deviceShadow: Shadow handler for shadow operations
"""Shadow clients inherit all connection management from the base MQTT client.
def getMQTTConnection(self) -> 'AWSIoTMQTTClient':
"""
Get underlying MQTT client for direct MQTT operations.
Returns:
AWSIoTMQTTClient: The underlying MQTT client instance
"""Individual device shadow operations for state management and synchronization.
class deviceShadow:
def shadowGet(self, callback: callable, timeout: int) -> str:
"""
Retrieve current shadow state from AWS IoT.
Args:
callback (callable): Response callback (topic, payload) -> None
timeout (int): Operation timeout in seconds
Returns:
str: Token for tracking this shadow operation
"""
def shadowUpdate(self, JSONPayload: str, callback: callable, timeout: int) -> str:
"""
Update shadow state in AWS IoT.
Args:
JSONPayload (str): JSON shadow update payload
callback (callable): Response callback (topic, payload) -> None
timeout (int): Operation timeout in seconds
Returns:
str: Token for tracking this shadow operation
"""
def shadowDelete(self, callback: callable, timeout: int) -> str:
"""
Delete shadow from AWS IoT.
Args:
callback (callable): Response callback (topic, payload) -> None
timeout (int): Operation timeout in seconds
Returns:
str: Token for tracking this shadow operation
"""Register for notifications when shadow desired state differs from reported state.
def shadowRegisterDeltaCallback(self, callback: callable):
"""
Register callback for shadow delta events.
Args:
callback (callable): Delta callback (topic, payload) -> None
"""
def shadowUnregisterDeltaCallback(self):
"""Unregister delta callback for this shadow."""import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
import json
import time
# Create shadow client
shadowClient = AWSIoTPyMQTT.AWSIoTMQTTShadowClient("myShadowClient")
shadowClient.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
shadowClient.configureCredentials("rootCA.crt", "private.key", "certificate.crt")
# Configure connection
shadowClient.configureAutoReconnectBackoffTime(1, 32, 20)
shadowClient.configureConnectDisconnectTimeout(10)
shadowClient.configureMQTTOperationTimeout(5)
# Connect to AWS IoT
shadowClient.connect()
# Create device shadow handler
deviceShadowHandler = shadowClient.createShadowHandlerWithName("myDevice", True)
# Shadow response callbacks
def shadowUpdateCallback(payload, responseStatus, token):
if responseStatus == "timeout":
print("Shadow update request timed out")
elif responseStatus == "accepted":
print("Shadow update accepted")
elif responseStatus == "rejected":
print("Shadow update rejected")
def shadowGetCallback(payload, responseStatus, token):
if responseStatus == "timeout":
print("Shadow get request timed out")
elif responseStatus == "accepted":
payloadDict = json.loads(payload)
print(f"Shadow state: {payloadDict}")
elif responseStatus == "rejected":
print("Shadow get rejected")
# Get current shadow state
deviceShadowHandler.shadowGet(shadowGetCallback, 5)
# Update shadow state
shadowPayload = {
"state": {
"desired": {
"temperature": 25.0,
"humidity": 60.0
},
"reported": {
"temperature": 23.5,
"humidity": 58.0
}
}
}
deviceShadowHandler.shadowUpdate(json.dumps(shadowPayload), shadowUpdateCallback, 5)import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
import json
# Create and configure shadow client
shadowClient = AWSIoTPyMQTT.AWSIoTMQTTShadowClient("deltaClient")
shadowClient.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
shadowClient.configureCredentials("rootCA.crt", "private.key", "certificate.crt")
shadowClient.connect()
# Create shadow handler
deviceShadowHandler = shadowClient.createShadowHandlerWithName("myDevice", True)
# Delta callback - called when desired state differs from reported state
def shadowDeltaCallback(payload, responseStatus, token):
print(f"Received shadow delta: {payload}")
deltaPayload = json.loads(payload)
# Extract delta state changes
if "state" in deltaPayload:
deltaState = deltaPayload["state"]
print(f"Delta state changes: {deltaState}")
# Apply delta changes to device
for key, value in deltaState.items():
print(f"Updating device {key} to {value}")
# Implement device-specific logic here
# Report updated state back to shadow
reportedPayload = {
"state": {
"reported": deltaState
}
}
deviceShadowHandler.shadowUpdate(json.dumps(reportedPayload), shadowUpdateCallback, 5)
def shadowUpdateCallback(payload, responseStatus, token):
if responseStatus == "accepted":
print("Device state updated successfully")
# Register for delta notifications
deviceShadowHandler.shadowRegisterDeltaCallback(shadowDeltaCallback)
# Keep client running to receive delta notifications
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
deviceShadowHandler.shadowUnregisterDeltaCallback()
shadowClient.disconnect()import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
import json
# Create shadow client
shadowClient = AWSIoTPyMQTT.AWSIoTMQTTShadowClient("multiShadowClient")
shadowClient.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
shadowClient.configureCredentials("rootCA.crt", "private.key", "certificate.crt")
shadowClient.connect()
# Create multiple shadow handlers
thermostatShadow = shadowClient.createShadowHandlerWithName("thermostat", True)
lightingShadow = shadowClient.createShadowHandlerWithName("lighting", True)
securityShadow = shadowClient.createShadowHandlerWithName("security", True)
# Shadow-specific callbacks
def thermostatCallback(payload, responseStatus, token):
if responseStatus == "accepted":
print(f"Thermostat shadow updated: {payload}")
def lightingCallback(payload, responseStatus, token):
if responseStatus == "accepted":
print(f"Lighting shadow updated: {payload}")
def securityCallback(payload, responseStatus, token):
if responseStatus == "accepted":
print(f"Security shadow updated: {payload}")
# Update each shadow independently
thermostat_state = {
"state": {
"reported": {
"temperature": 22.0,
"target_temperature": 24.0
}
}
}
lighting_state = {
"state": {
"reported": {
"brightness": 80,
"color": {"r": 255, "g": 255, "b": 255}
}
}
}
security_state = {
"state": {
"reported": {
"armed": True,
"sensors": {"door": "closed", "motion": "inactive"}
}
}
}
# Perform shadow updates
thermostatShadow.shadowUpdate(json.dumps(thermostat_state), thermostatCallback, 5)
lightingShadow.shadowUpdate(json.dumps(lighting_state), lightingCallback, 5)
securityShadow.shadowUpdate(json.dumps(security_state), securityCallback, 5)# Example shadow document structure
shadow_document = {
"state": {
"desired": {
# Desired device state set by applications
"temperature": 25.0,
"mode": "heat"
},
"reported": {
# Current device state reported by device
"temperature": 23.5,
"mode": "heat",
"connected": True
},
"delta": {
# Difference between desired and reported (read-only)
"temperature": 1.5
}
},
"metadata": {
# Timestamps and version information
"desired": {
"temperature": {"timestamp": 1609459200},
"mode": {"timestamp": 1609459200}
},
"reported": {
"temperature": {"timestamp": 1609459180},
"mode": {"timestamp": 1609459180},
"connected": {"timestamp": 1609459180}
}
},
"version": 123,
"timestamp": 1609459200
}# Shadow response callback signature
def shadowCallback(payload: str, responseStatus: str, token: str) -> None:
"""
Shadow operation response callback.
Args:
payload (str): JSON response payload
responseStatus (str): "accepted", "rejected", or "timeout"
token (str): Token from the shadow operation
"""
# Shadow delta callback signature
def shadowDeltaCallback(payload: str, responseStatus: str, token: str) -> None:
"""
Shadow delta notification callback.
Args:
payload (str): JSON delta payload with state differences
responseStatus (str): Always "delta" for delta callbacks
token (str): Token from delta notification
"""
# Shadow operation response statuses
SHADOW_ACCEPTED = "accepted"
SHADOW_REJECTED = "rejected"
SHADOW_TIMEOUT = "timeout"
SHADOW_DELTA = "delta"Install with Tessl CLI
npx tessl i tessl/pypi-awsiotpythonsdk