AWS IoT SDK based on the AWS Common Runtime for connecting IoT devices to AWS IoT Core services
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Complete AWS IoT Device Shadow functionality including get, update, delete operations for both classic and named shadows, with support for shadow delta events and real-time shadow document updates through both V1 (callback-based) and V2 (Future-based) client interfaces.
The V1 client provides callback-based operations following the traditional publish/subscribe pattern.
class IotShadowClient(MqttServiceClient):
"""
V1 client for AWS IoT Device Shadow service with callback-based operations.
Parameters:
- mqtt_connection: MQTT connection (mqtt.Connection or mqtt5.Client)
"""
def __init__(self, mqtt_connection): ...Usage example:
from awsiot import mqtt_connection_builder, iotshadow
# Create MQTT connection
connection = mqtt_connection_builder.mtls_from_path(
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
cert_filepath="/path/to/certificate.pem.crt",
pri_key_filepath="/path/to/private.pem.key",
client_id="shadow-client-123"
)
# Create shadow client
shadow_client = iotshadow.IotShadowClient(connection)def publish_get_shadow(self, request, qos):
"""
Publish request to get device shadow.
Parameters:
- request (GetShadowRequest): Shadow request
- qos (awscrt.mqtt.QoS): Quality of service
Returns:
Future: Future that completes when request is published
"""
def subscribe_to_get_shadow_accepted(self, request, qos, callback):
"""
Subscribe to successful get shadow responses.
Parameters:
- request (GetShadowSubscriptionRequest): Subscription request
- qos (awscrt.mqtt.QoS): Quality of service
- callback: Function called when shadow is retrieved
Returns:
Tuple[Future, str]: Future for subscription, topic string
"""
def subscribe_to_get_shadow_rejected(self, request, qos, callback):
"""
Subscribe to rejected get shadow responses.
Parameters:
- request (GetShadowSubscriptionRequest): Subscription request
- qos (awscrt.mqtt.QoS): Quality of service
- callback: Function called when request is rejected
Returns:
Tuple[Future, str]: Future for subscription, topic string
"""def publish_update_shadow(self, request, qos):
"""
Publish request to update device shadow.
Parameters:
- request (UpdateShadowRequest): Shadow update request
- qos (awscrt.mqtt.QoS): Quality of service
Returns:
Future: Future that completes when request is published
"""
def subscribe_to_update_shadow_accepted(self, request, qos, callback):
"""
Subscribe to successful update shadow responses.
Parameters:
- request (UpdateShadowSubscriptionRequest): Subscription request
- qos (awscrt.mqtt.QoS): Quality of service
- callback: Function called when update succeeds
Returns:
Tuple[Future, str]: Future for subscription, topic string
"""
def subscribe_to_update_shadow_rejected(self, request, qos, callback):
"""
Subscribe to rejected update shadow responses.
Parameters:
- request (UpdateShadowSubscriptionRequest): Subscription request
- qos (awscrt.mqtt.QoS): Quality of service
- callback: Function called when update is rejected
Returns:
Tuple[Future, str]: Future for subscription, topic string
"""def publish_delete_shadow(self, request, qos):
"""
Publish request to delete device shadow.
Parameters:
- request (DeleteShadowRequest): Shadow delete request
- qos (awscrt.mqtt.QoS): Quality of service
Returns:
Future: Future that completes when request is published
"""
def subscribe_to_delete_shadow_accepted(self, request, qos, callback):
"""
Subscribe to successful delete shadow responses.
"""
def subscribe_to_delete_shadow_rejected(self, request, qos, callback):
"""
Subscribe to rejected delete shadow responses.
"""Named shadows allow multiple shadows per device. All classic shadow operations have named shadow equivalents:
def publish_get_named_shadow(self, request, qos): ...
def publish_update_named_shadow(self, request, qos): ...
def publish_delete_named_shadow(self, request, qos): ...
def subscribe_to_get_named_shadow_accepted(self, request, qos, callback): ...
def subscribe_to_get_named_shadow_rejected(self, request, qos, callback): ...
def subscribe_to_update_named_shadow_accepted(self, request, qos, callback): ...
def subscribe_to_update_named_shadow_rejected(self, request, qos, callback): ...
def subscribe_to_delete_named_shadow_accepted(self, request, qos, callback): ...
def subscribe_to_delete_named_shadow_rejected(self, request, qos, callback): ...Subscribe to shadow delta events when desired and reported states differ:
def subscribe_to_shadow_delta_events(self, request, qos, callback):
"""
Subscribe to shadow delta events for classic shadow.
Parameters:
- request (ShadowDeltaUpdatedSubscriptionRequest): Subscription request
- qos (awscrt.mqtt.QoS): Quality of service
- callback: Function called when delta event occurs
Returns:
Tuple[Future, str]: Future for subscription, topic string
"""
def subscribe_to_named_shadow_delta_events(self, request, qos, callback):
"""
Subscribe to shadow delta events for named shadow.
"""Subscribe to shadow document update events:
def subscribe_to_shadow_documents_events(self, request, qos, callback):
"""
Subscribe to shadow document update events for classic shadow.
Parameters:
- request (ShadowUpdatedSubscriptionRequest): Subscription request
- qos (awscrt.mqtt.QoS): Quality of service
- callback: Function called when shadow document updates
Returns:
Tuple[Future, str]: Future for subscription, topic string
"""
def subscribe_to_named_shadow_documents_events(self, request, qos, callback):
"""
Subscribe to shadow document update events for named shadow.
"""The V2 client provides Future-based operations with request-response semantics.
class IotShadowClientV2:
"""
V2 client for AWS IoT Device Shadow service with Future-based operations.
"""
def __init__(self, connection): ...def get_shadow(self, request):
"""
Get device shadow using request-response pattern.
Parameters:
- request (GetShadowRequest): Shadow request
Returns:
Future[GetShadowResponse]: Future containing shadow response
"""
def update_shadow(self, request):
"""
Update device shadow using request-response pattern.
Parameters:
- request (UpdateShadowRequest): Shadow update request
Returns:
Future[UpdateShadowResponse]: Future containing update response
"""
def delete_shadow(self, request):
"""
Delete device shadow using request-response pattern.
Parameters:
- request (DeleteShadowRequest): Shadow delete request
Returns:
Future[DeleteShadowResponse]: Future containing delete response
"""def get_named_shadow(self, request):
"""Get named shadow using request-response pattern."""
def update_named_shadow(self, request):
"""Update named shadow using request-response pattern."""
def delete_named_shadow(self, request):
"""Delete named shadow using request-response pattern."""def create_shadow_delta_stream(self, request, options):
"""
Create streaming operation for shadow delta events.
Parameters:
- request (ShadowDeltaUpdatedSubscriptionRequest): Stream request
- options (ServiceStreamOptions): Stream configuration
Returns:
StreamingOperation: Streaming operation handle
"""
def create_named_shadow_delta_stream(self, request, options):
"""Create streaming operation for named shadow delta events."""
def create_shadow_documents_stream(self, request, options):
"""Create streaming operation for shadow document events."""
def create_named_shadow_documents_stream(self, request, options):
"""Create streaming operation for named shadow document events."""@dataclass
class GetShadowRequest:
"""Request to get device shadow."""
thing_name: str
@dataclass
class GetNamedShadowRequest:
"""Request to get named shadow."""
thing_name: str
shadow_name: str
@dataclass
class UpdateShadowRequest:
"""Request to update device shadow."""
thing_name: str
state: Optional[ShadowState] = None
client_token: Optional[str] = None
@dataclass
class UpdateNamedShadowRequest:
"""Request to update named shadow."""
thing_name: str
shadow_name: str
state: Optional[ShadowState] = None
client_token: Optional[str] = None
@dataclass
class DeleteShadowRequest:
"""Request to delete device shadow."""
thing_name: str
@dataclass
class DeleteNamedShadowRequest:
"""Request to delete named shadow."""
thing_name: str
shadow_name: str@dataclass
class GetShadowResponse:
"""Response from get shadow operation."""
state: Optional[ShadowStateWithDelta] = None
metadata: Optional[ShadowMetadata] = None
version: Optional[int] = None
timestamp: Optional[datetime.datetime] = None
client_token: Optional[str] = None
@dataclass
class UpdateShadowResponse:
"""Response from update shadow operation."""
state: Optional[ShadowState] = None
metadata: Optional[ShadowMetadata] = None
version: Optional[int] = None
timestamp: Optional[datetime.datetime] = None
client_token: Optional[str] = None
@dataclass
class DeleteShadowResponse:
"""Response from delete shadow operation."""
version: Optional[int] = None
timestamp: Optional[datetime.datetime] = None
client_token: Optional[str] = None@dataclass
class ShadowState:
"""Device shadow state containing desired and reported properties."""
desired: Optional[Dict[str, Any]] = None
reported: Optional[Dict[str, Any]] = None
@dataclass
class ShadowStateWithDelta:
"""Shadow state including delta between desired and reported."""
desired: Optional[Dict[str, Any]] = None
reported: Optional[Dict[str, Any]] = None
delta: Optional[Dict[str, Any]] = None
@dataclass
class ShadowMetadata:
"""Metadata about shadow properties including timestamps."""
desired: Optional[Dict[str, Any]] = None
reported: Optional[Dict[str, Any]] = None@dataclass
class ShadowDeltaUpdatedEvent:
"""Event published when shadow delta changes."""
version: Optional[int] = None
timestamp: Optional[datetime.datetime] = None
state: Optional[Dict[str, Any]] = None
metadata: Optional[Dict[str, Any]] = None
client_token: Optional[str] = None
@dataclass
class ShadowUpdatedEvent:
"""Event published when shadow document updates."""
previous: Optional[ShadowUpdatedSnapshot] = None
current: Optional[ShadowUpdatedSnapshot] = None
timestamp: Optional[datetime.datetime] = None
client_token: Optional[str] = None@dataclass
class GetShadowSubscriptionRequest:
"""Subscription request for get shadow responses."""
thing_name: str
@dataclass
class ShadowDeltaUpdatedSubscriptionRequest:
"""Subscription request for shadow delta events."""
thing_name: str
@dataclass
class ShadowUpdatedSubscriptionRequest:
"""Subscription request for shadow document events."""
thing_name: str@dataclass
class ErrorResponse:
"""Error response from shadow operations."""
code: Optional[int] = None
message: Optional[str] = None
timestamp: Optional[datetime.datetime] = None
client_token: Optional[str] = Nonefrom awsiot import mqtt_connection_builder, iotshadow
from awscrt import mqtt
import json
# Create connection and client
connection = mqtt_connection_builder.mtls_from_path(
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
cert_filepath="/path/to/certificate.pem.crt",
pri_key_filepath="/path/to/private.pem.key",
client_id="shadow-device-123"
)
shadow_client = iotshadow.IotShadowClient(connection)
connection.connect().result()
# Define callbacks
def on_update_accepted(response):
print(f"Shadow update accepted: {response}")
def on_update_rejected(error):
print(f"Shadow update rejected: {error}")
# Subscribe to responses
shadow_client.subscribe_to_update_shadow_accepted(
iotshadow.UpdateShadowSubscriptionRequest(thing_name="MyDevice"),
mqtt.QoS.AT_LEAST_ONCE,
on_update_accepted
).result()
shadow_client.subscribe_to_update_shadow_rejected(
iotshadow.UpdateShadowSubscriptionRequest(thing_name="MyDevice"),
mqtt.QoS.AT_LEAST_ONCE,
on_update_rejected
).result()
# Update shadow
update_request = iotshadow.UpdateShadowRequest(
thing_name="MyDevice",
state=iotshadow.ShadowState(
reported={"temperature": 23.5, "humidity": 60.2, "status": "online"}
)
)
shadow_client.publish_update_shadow(update_request, mqtt.QoS.AT_LEAST_ONCE).result()def on_shadow_delta(delta_event):
"""Handle shadow delta events when desired != reported."""
print(f"Shadow delta received: {delta_event}")
# Extract delta changes
if delta_event.state:
for property_name, desired_value in delta_event.state.items():
print(f"Property '{property_name}' should be updated to: {desired_value}")
# Update device state and report back
# ... device-specific logic here ...
# Report updated state
update_request = iotshadow.UpdateShadowRequest(
thing_name="MyDevice",
state=iotshadow.ShadowState(
reported=delta_event.state # Report desired state as achieved
),
client_token=delta_event.client_token
)
shadow_client.publish_update_shadow(update_request, mqtt.QoS.AT_LEAST_ONCE)
# Subscribe to delta events
shadow_client.subscribe_to_shadow_delta_events(
iotshadow.ShadowDeltaUpdatedSubscriptionRequest(thing_name="MyDevice"),
mqtt.QoS.AT_LEAST_ONCE,
on_shadow_delta
).result()from awsiot import mqtt_connection_builder, iotshadow
import asyncio
async def shadow_operations():
# Create connection
connection = mqtt_connection_builder.mtls_from_path(
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
cert_filepath="/path/to/certificate.pem.crt",
pri_key_filepath="/path/to/private.pem.key",
client_id="shadow-v2-client"
)
# Create V2 client
shadow_client = iotshadow.IotShadowClientV2(connection)
await connection.connect()
try:
# Get current shadow
get_request = iotshadow.GetShadowRequest(thing_name="MyDevice")
get_response = await shadow_client.get_shadow(get_request)
print(f"Current shadow: {get_response}")
# Update shadow
update_request = iotshadow.UpdateShadowRequest(
thing_name="MyDevice",
state=iotshadow.ShadowState(
desired={"target_temperature": 22.0},
reported={"current_temperature": 20.1}
)
)
update_response = await shadow_client.update_shadow(update_request)
print(f"Update response: {update_response}")
except iotshadow.V2ServiceException as e:
print(f"Shadow operation failed: {e.message}")
if e.modeled_error:
print(f"Error details: {e.modeled_error}")
finally:
await connection.disconnect()
# Run async operations
asyncio.run(shadow_operations())# Using named shadows for multi-component devices
update_request = iotshadow.UpdateNamedShadowRequest(
thing_name="SmartHome",
shadow_name="thermostat",
state=iotshadow.ShadowState(
reported={
"temperature": 21.5,
"humidity": 45.0,
"mode": "heat",
"target_temperature": 22.0
}
)
)
# V1 client
shadow_client.publish_update_named_shadow(update_request, mqtt.QoS.AT_LEAST_ONCE)
# V2 client
response = await shadow_client.update_named_shadow(update_request)Install with Tessl CLI
npx tessl i tessl/pypi-awsiotsdk