AWS IoT SDK based on the AWS Common Runtime for connecting IoT devices to AWS IoT Core services
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Complete Greengrass Core Inter-Process Communication client providing access to all Greengrass Core services including component management, configuration, local deployment, pub/sub messaging, device shadow operations, security services, secrets management, and metrics collection.
def connect(**kwargs):
"""
Create IPC connection to Greengrass Core.
Parameters:
- ipc_socket (str): IPC socket path (optional, defaults to environment variable)
- authtoken (str): Authentication token (optional, defaults to environment variable)
- lifecycle_handler: Lifecycle event handler (optional)
- timeout (float): Connection timeout in seconds (optional)
Returns:
GreengrassCoreIPCClient: Connected IPC client
"""Usage example:
from awsiot.greengrasscoreipc import connect
# Connect using environment variables (typical for Greengrass components)
ipc_client = connect()
# Or specify connection parameters explicitly
ipc_client = connect(
ipc_socket="/tmp/greengrass_ipc.sock",
authtoken="your-auth-token"
)class GreengrassCoreIPCClient:
"""
V1 client for Greengrass Core IPC operations with callback-based interface.
"""
def __init__(self, connection): ...def new_get_component_details(self):
"""Create operation to get component details."""
def new_list_components(self):
"""Create operation to list components."""
def new_restart_component(self):
"""Create operation to restart a component."""
def new_stop_component(self):
"""Create operation to stop a component."""
def new_pause_component(self):
"""Create operation to pause a component."""
def new_resume_component(self):
"""Create operation to resume a component."""def new_get_configuration(self):
"""Create operation to get configuration."""
def new_update_configuration(self):
"""Create operation to update configuration."""
def new_subscribe_to_configuration_update(self):
"""Create streaming operation to subscribe to configuration updates."""def new_create_local_deployment(self):
"""Create operation to create local deployment."""
def new_get_local_deployment_status(self):
"""Create operation to get local deployment status."""
def new_list_local_deployments(self):
"""Create operation to list local deployments."""
def new_cancel_local_deployment(self):
"""Create operation to cancel local deployment."""def new_publish_to_iot_core(self):
"""Create operation to publish message to IoT Core."""
def new_subscribe_to_iot_core(self):
"""Create streaming operation to subscribe to IoT Core topics."""def new_publish_to_topic(self):
"""Create operation to publish to local topic."""
def new_subscribe_to_topic(self):
"""Create streaming operation to subscribe to local topics."""def new_get_thing_shadow(self):
"""Create operation to get thing shadow."""
def new_update_thing_shadow(self):
"""Create operation to update thing shadow."""
def new_delete_thing_shadow(self):
"""Create operation to delete thing shadow."""
def new_list_named_shadows_for_thing(self):
"""Create operation to list named shadows for thing."""def new_get_client_device_auth_token(self):
"""Create operation to get client device auth token."""
def new_verify_client_device_identity(self):
"""Create operation to verify client device identity."""
def new_authorize_client_device_action(self):
"""Create operation to authorize client device action."""def new_get_secret_value(self):
"""Create operation to get secret value."""def new_put_component_metric(self):
"""Create operation to put component metric."""def new_update_state(self):
"""Create operation to update component state."""
def new_defer_component_update(self):
"""Create operation to defer component update."""def new_subscribe_to_certificate_updates(self):
"""Create streaming operation to subscribe to certificate updates."""class GreengrassCoreIPCClientV2:
"""
V2 client for Greengrass Core IPC operations with Future-based interface.
"""
def __init__(self, connection): ...The V2 client provides the same operations as V1 but with Future-based return types for better async/await support.
@dataclass
class GetComponentDetailsRequest:
"""Request to get component details."""
component_name: str
@dataclass
class GetComponentDetailsResponse:
"""Response containing component details."""
component_details: Optional[ComponentDetails] = None
@dataclass
class ComponentDetails:
"""Details about a Greengrass component."""
component_name: Optional[str] = None
version: Optional[str] = None
state: Optional[str] = None # LifecycleState
configuration: Optional[Dict[str, Any]] = None
@dataclass
class ListComponentsRequest:
"""Request to list components."""
pass
@dataclass
class ListComponentsResponse:
"""Response containing component list."""
components: Optional[List[ComponentDetails]] = None
@dataclass
class RestartComponentRequest:
"""Request to restart component."""
component_name: str
@dataclass
class RestartComponentResponse:
"""Response from restart component operation."""
restart_status: Optional[str] = None # RequestStatus
message: Optional[str] = None@dataclass
class GetConfigurationRequest:
"""Request to get configuration."""
component_name: Optional[str] = None
key_path: Optional[List[str]] = None
@dataclass
class GetConfigurationResponse:
"""Response containing configuration."""
component_name: Optional[str] = None
value: Optional[Dict[str, Any]] = None
@dataclass
class UpdateConfigurationRequest:
"""Request to update configuration."""
key_path: Optional[List[str]] = None
timestamp: Optional[datetime.datetime] = None
value_to_merge: Optional[Dict[str, Any]] = None
@dataclass
class UpdateConfigurationResponse:
"""Response from configuration update."""
pass
@dataclass
class ConfigurationUpdateEvent:
"""Event for configuration updates."""
component_name: Optional[str] = None
key_path: Optional[List[str]] = None@dataclass
class CreateLocalDeploymentRequest:
"""Request to create local deployment."""
group_name: str
deployment_id: Optional[str] = None
components_to_add: Optional[Dict[str, ComponentDeploymentSpecification]] = None
components_to_remove: Optional[List[str]] = None
components_to_merge: Optional[Dict[str, ComponentDeploymentSpecification]] = None
failure_handling_policy: Optional[str] = None # FailureHandlingPolicy
recipe_directory_path: Optional[str] = None
artifacts_directory_path: Optional[str] = None
@dataclass
class CreateLocalDeploymentResponse:
"""Response from create local deployment."""
deployment_id: Optional[str] = None
@dataclass
class GetLocalDeploymentStatusRequest:
"""Request to get deployment status."""
deployment_id: str
@dataclass
class GetLocalDeploymentStatusResponse:
"""Response containing deployment status."""
deployment: Optional[LocalDeployment] = None@dataclass
class PublishToTopicRequest:
"""Request to publish to local topic."""
topic: str
publish_message: Optional[PublishMessage] = None
@dataclass
class PublishToTopicResponse:
"""Response from publish to topic."""
pass
@dataclass
class PublishMessage:
"""Message to publish."""
json_message: Optional[Dict[str, Any]] = None
binary_message: Optional[BinaryMessage] = None
@dataclass
class SubscribeToTopicRequest:
"""Request to subscribe to local topic."""
topic: str
@dataclass
class SubscriptionResponseMessage:
"""Message received from subscription."""
json_message: Optional[Dict[str, Any]] = None
binary_message: Optional[BinaryMessage] = None
context: Optional[MessageContext] = None@dataclass
class PublishToIoTCoreRequest:
"""Request to publish to IoT Core."""
topic_name: str
qos: Optional[str] = None # QOS
payload: Optional[bytes] = None
retain: Optional[bool] = None
user_properties: Optional[List[UserProperty]] = None
message_expiry_interval_seconds: Optional[int] = None
correlation_data: Optional[bytes] = None
response_topic: Optional[str] = None
payload_format: Optional[str] = None # PayloadFormat
content_type: Optional[str] = None
@dataclass
class PublishToIoTCoreResponse:
"""Response from publish to IoT Core."""
pass
@dataclass
class SubscribeToIoTCoreRequest:
"""Request to subscribe to IoT Core topic."""
topic_name: str
qos: Optional[str] = None # QOS
@dataclass
class IoTCoreMessage:
"""Message from IoT Core."""
message: Optional[MQTTMessage] = None@dataclass
class GetThingShadowRequest:
"""Request to get thing shadow."""
thing_name: str
shadow_name: Optional[str] = None
@dataclass
class GetThingShadowResponse:
"""Response containing thing shadow."""
payload: Optional[bytes] = None
@dataclass
class UpdateThingShadowRequest:
"""Request to update thing shadow."""
thing_name: str
payload: bytes
shadow_name: Optional[str] = None
@dataclass
class UpdateThingShadowResponse:
"""Response from update thing shadow."""
payload: Optional[bytes] = None
@dataclass
class DeleteThingShadowRequest:
"""Request to delete thing shadow."""
thing_name: str
shadow_name: Optional[str] = None
@dataclass
class DeleteThingShadowResponse:
"""Response from delete thing shadow."""
payload: Optional[bytes] = None@dataclass
class GetClientDeviceAuthTokenRequest:
"""Request to get client device auth token."""
credential: Optional[CredentialDocument] = None
@dataclass
class GetClientDeviceAuthTokenResponse:
"""Response containing auth token."""
client_device_auth_token: Optional[str] = None
@dataclass
class VerifyClientDeviceIdentityRequest:
"""Request to verify client device identity."""
credential: Optional[CredentialDocument] = None
@dataclass
class VerifyClientDeviceIdentityResponse:
"""Response from identity verification."""
is_valid_client_device: Optional[bool] = None
@dataclass
class AuthorizeClientDeviceActionRequest:
"""Request to authorize client device action."""
client_device_auth_token: str
operation: str
resource: str
@dataclass
class AuthorizeClientDeviceActionResponse:
"""Response from authorization check."""
is_authorized: Optional[bool] = None@dataclass
class GetSecretValueRequest:
"""Request to get secret value."""
secret_id: str
version_id: Optional[str] = None
version_stage: Optional[str] = None
@dataclass
class GetSecretValueResponse:
"""Response containing secret value."""
secret_id: Optional[str] = None
version_id: Optional[str] = None
secret_binary: Optional[bytes] = None
secret_string: Optional[str] = None@dataclass
class PutComponentMetricRequest:
"""Request to put component metric."""
metrics: List[Metric]
@dataclass
class PutComponentMetricResponse:
"""Response from put metric operation."""
pass
@dataclass
class Metric:
"""Component metric data."""
name: str
unit: Optional[str] = None # MetricUnitType
value: Optional[float] = None
timestamp: Optional[datetime.datetime] = Noneclass LifecycleState:
"""Component lifecycle state constants."""
NEW = "NEW"
INSTALLED = "INSTALLED"
STARTING = "STARTING"
RUNNING = "RUNNING"
STOPPING = "STOPPING"
ERRORED = "ERRORED"
BROKEN = "BROKEN"
FINISHED = "FINISHED"class RequestStatus:
"""Request status constants."""
SUCCEEDED = "SUCCEEDED"
FAILED = "FAILED"class QOS:
"""MQTT QoS level constants."""
AT_MOST_ONCE = "AT_MOST_ONCE"
AT_LEAST_ONCE = "AT_LEAST_ONCE"from awsiot.greengrasscoreipc import connect
import json
# Connect to Greengrass Core
ipc_client = connect()
def list_all_components():
"""List all Greengrass components."""
try:
# Create operation
operation = ipc_client.new_list_components()
# Activate operation
operation.activate({})
# Get result
result = operation.get_response().result()
print("Greengrass Components:")
for component in result.components:
print(f" Name: {component.component_name}")
print(f" Version: {component.version}")
print(f" State: {component.state}")
print(" ---")
return result.components
except Exception as e:
print(f"Failed to list components: {e}")
return []
def get_component_details(component_name):
"""Get details for a specific component."""
try:
operation = ipc_client.new_get_component_details()
request = {
"componentName": component_name
}
operation.activate(request)
result = operation.get_response().result()
details = result.component_details
print(f"Component: {details.component_name}")
print(f"Version: {details.version}")
print(f"State: {details.state}")
print(f"Configuration: {json.dumps(details.configuration, indent=2)}")
return details
except Exception as e:
print(f"Failed to get component details: {e}")
return None
def restart_component(component_name):
"""Restart a Greengrass component."""
try:
operation = ipc_client.new_restart_component()
request = {
"componentName": component_name
}
operation.activate(request)
result = operation.get_response().result()
print(f"Restart status: {result.restart_status}")
if result.message:
print(f"Message: {result.message}")
return result.restart_status == "SUCCEEDED"
except Exception as e:
print(f"Failed to restart component: {e}")
return False
# Usage
components = list_all_components()
if components:
details = get_component_details(components[0].component_name)
# Restart a component (be careful with this!)
# restart_component("YourComponentName")from awsiot.greengrasscoreipc import connect
import json
import time
import threading
ipc_client = connect()
def publish_to_local_topic(topic, message):
"""Publish message to local Greengrass topic."""
try:
operation = ipc_client.new_publish_to_topic()
request = {
"topic": topic,
"publishMessage": {
"jsonMessage": message
}
}
operation.activate(request)
result = operation.get_response().result()
print(f"Published to topic {topic}: {message}")
return True
except Exception as e:
print(f"Failed to publish: {e}")
return False
def subscribe_to_local_topic(topic, callback):
"""Subscribe to local Greengrass topic."""
try:
operation = ipc_client.new_subscribe_to_topic()
request = {
"topic": topic
}
def stream_handler(event):
"""Handle incoming messages."""
try:
if hasattr(event, 'json_message') and event.json_message:
callback(topic, event.json_message)
elif hasattr(event, 'binary_message') and event.binary_message:
callback(topic, event.binary_message.message)
except Exception as e:
print(f"Error handling message: {e}")
# Activate subscription
operation.activate(request)
# Set up message handler
operation.get_response().add_done_callback(
lambda future: print(f"Subscription to {topic} established")
)
# Handle stream events
stream = operation.get_response().result()
while True:
try:
event = stream.get_next_message().result(timeout=1.0)
stream_handler(event)
except Exception:
# Timeout or stream closed
break
except Exception as e:
print(f"Failed to subscribe to topic: {e}")
def message_handler(topic, message):
"""Handle received messages."""
print(f"Received on {topic}: {message}")
# Example usage
def pub_sub_example():
"""Example of local pub/sub communication."""
# Start subscriber in background thread
subscribe_thread = threading.Thread(
target=subscribe_to_local_topic,
args=("sensor/temperature", message_handler)
)
subscribe_thread.daemon = True
subscribe_thread.start()
# Give subscriber time to connect
time.sleep(1)
# Publish some messages
for i in range(5):
message = {
"temperature": 20.0 + i,
"humidity": 45.0 + i * 2,
"timestamp": time.time()
}
publish_to_local_topic("sensor/temperature", message)
time.sleep(2)
# Run example
pub_sub_example()from awsiot.greengrasscoreipc import connect
import json
ipc_client = connect()
def get_component_configuration(component_name, key_path=None):
"""Get component configuration."""
try:
operation = ipc_client.new_get_configuration()
request = {
"componentName": component_name
}
if key_path:
request["keyPath"] = key_path
operation.activate(request)
result = operation.get_response().result()
print(f"Configuration for {component_name}:")
print(json.dumps(result.value, indent=2))
return result.value
except Exception as e:
print(f"Failed to get configuration: {e}")
return None
def update_component_configuration(key_path, value_to_merge):
"""Update component configuration."""
try:
operation = ipc_client.new_update_configuration()
request = {
"keyPath": key_path,
"valueToMerge": value_to_merge
}
operation.activate(request)
result = operation.get_response().result()
print("Configuration updated successfully")
return True
except Exception as e:
print(f"Failed to update configuration: {e}")
return False
def subscribe_to_config_updates():
"""Subscribe to configuration update events."""
try:
operation = ipc_client.new_subscribe_to_configuration_update()
request = {}
def config_update_handler(event):
"""Handle configuration update events."""
print(f"Configuration updated:")
print(f" Component: {event.component_name}")
print(f" Key path: {event.key_path}")
operation.activate(request)
# Handle stream events
stream = operation.get_response().result()
while True:
try:
event = stream.get_next_message().result(timeout=30.0)
config_update_handler(event)
except Exception:
break
except Exception as e:
print(f"Failed to subscribe to config updates: {e}")
# Usage examples
config = get_component_configuration("MyComponent")
# Update configuration
update_component_configuration(
["mqtt", "timeout"],
{"connectionTimeout": 30}
)from awsiot.greengrasscoreipc import connect
import json
ipc_client = connect()
def get_device_shadow(thing_name, shadow_name=None):
"""Get device shadow."""
try:
operation = ipc_client.new_get_thing_shadow()
request = {
"thingName": thing_name
}
if shadow_name:
request["shadowName"] = shadow_name
operation.activate(request)
result = operation.get_response().result()
# Parse shadow payload
shadow_data = json.loads(result.payload.decode())
print(f"Shadow for {thing_name}:")
print(json.dumps(shadow_data, indent=2))
return shadow_data
except Exception as e:
print(f"Failed to get shadow: {e}")
return None
def update_device_shadow(thing_name, shadow_state, shadow_name=None):
"""Update device shadow."""
try:
operation = ipc_client.new_update_thing_shadow()
shadow_document = {
"state": shadow_state
}
request = {
"thingName": thing_name,
"payload": json.dumps(shadow_document).encode()
}
if shadow_name:
request["shadowName"] = shadow_name
operation.activate(request)
result = operation.get_response().result()
# Parse response
response_data = json.loads(result.payload.decode())
print(f"Shadow updated for {thing_name}:")
print(json.dumps(response_data, indent=2))
return response_data
except Exception as e:
print(f"Failed to update shadow: {e}")
return None
# Usage
shadow = get_device_shadow("MyGreengrassDevice")
# Update shadow
updated_shadow = update_device_shadow(
"MyGreengrassDevice",
{
"reported": {
"temperature": 23.5,
"status": "online",
"last_update": "2023-12-07T10:30:00Z"
}
}
)from awsiot.greengrasscoreipc import connect
ipc_client = connect()
def get_secret(secret_id, version_id=None, version_stage=None):
"""Get secret value from AWS Secrets Manager."""
try:
operation = ipc_client.new_get_secret_value()
request = {
"secretId": secret_id
}
if version_id:
request["versionId"] = version_id
if version_stage:
request["versionStage"] = version_stage
operation.activate(request)
result = operation.get_response().result()
print(f"Retrieved secret: {result.secret_id}")
if result.secret_string:
return result.secret_string
elif result.secret_binary:
return result.secret_binary
else:
return None
except Exception as e:
print(f"Failed to get secret: {e}")
return None
# Usage
database_password = get_secret("prod/database/password")
if database_password:
print("Database password retrieved successfully")
# Use the password in your applicationInstall with Tessl CLI
npx tessl i tessl/pypi-awsiotsdk