CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-awsiotsdk

AWS IoT SDK based on the AWS Common Runtime for connecting IoT devices to AWS IoT Core services

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

greengrass-ipc.mddocs/

Greengrass Core IPC

Complete Greengrass Core Inter-Process Communication client providing access to all Greengrass Core services including component management, configuration, local deployment, pub/sub messaging, device shadow operations, security services, secrets management, and metrics collection.

Capabilities

Connection Setup

Connect Function

def connect(**kwargs):
    """
    Create IPC connection to Greengrass Core.
    
    Parameters:
    - ipc_socket (str): IPC socket path (optional, defaults to environment variable)
    - authtoken (str): Authentication token (optional, defaults to environment variable)
    - lifecycle_handler: Lifecycle event handler (optional)
    - timeout (float): Connection timeout in seconds (optional)
    
    Returns:
    GreengrassCoreIPCClient: Connected IPC client
    """

Usage example:

from awsiot.greengrasscoreipc import connect

# Connect using environment variables (typical for Greengrass components)
ipc_client = connect()

# Or specify connection parameters explicitly
ipc_client = connect(
    ipc_socket="/tmp/greengrass_ipc.sock",
    authtoken="your-auth-token"
)

V1 IPC Client

Client Class

class GreengrassCoreIPCClient:
    """
    V1 client for Greengrass Core IPC operations with callback-based interface.
    """
    def __init__(self, connection): ...

Component Management Operations

def new_get_component_details(self):
    """Create operation to get component details."""
    
def new_list_components(self):
    """Create operation to list components."""
    
def new_restart_component(self):
    """Create operation to restart a component."""
    
def new_stop_component(self):
    """Create operation to stop a component."""
    
def new_pause_component(self):
    """Create operation to pause a component."""
    
def new_resume_component(self):
    """Create operation to resume a component."""

Configuration Management Operations

def new_get_configuration(self):
    """Create operation to get configuration."""
    
def new_update_configuration(self):
    """Create operation to update configuration."""
    
def new_subscribe_to_configuration_update(self):
    """Create streaming operation to subscribe to configuration updates."""

Deployment Management Operations

def new_create_local_deployment(self):
    """Create operation to create local deployment."""
    
def new_get_local_deployment_status(self):
    """Create operation to get local deployment status."""
    
def new_list_local_deployments(self):
    """Create operation to list local deployments."""
    
def new_cancel_local_deployment(self):
    """Create operation to cancel local deployment."""

IoT Core Integration Operations

def new_publish_to_iot_core(self):
    """Create operation to publish message to IoT Core."""
    
def new_subscribe_to_iot_core(self):
    """Create streaming operation to subscribe to IoT Core topics."""

Local Pub/Sub Operations

def new_publish_to_topic(self):
    """Create operation to publish to local topic."""
    
def new_subscribe_to_topic(self):
    """Create streaming operation to subscribe to local topics."""

Device Shadow Operations

def new_get_thing_shadow(self):
    """Create operation to get thing shadow."""
    
def new_update_thing_shadow(self):
    """Create operation to update thing shadow."""
    
def new_delete_thing_shadow(self):
    """Create operation to delete thing shadow."""
    
def new_list_named_shadows_for_thing(self):
    """Create operation to list named shadows for thing."""

Security Operations

def new_get_client_device_auth_token(self):
    """Create operation to get client device auth token."""
    
def new_verify_client_device_identity(self):
    """Create operation to verify client device identity."""
    
def new_authorize_client_device_action(self):
    """Create operation to authorize client device action."""

Secrets Management Operations

def new_get_secret_value(self):
    """Create operation to get secret value."""

Metrics Operations

def new_put_component_metric(self):
    """Create operation to put component metric."""

State Management Operations

def new_update_state(self):
    """Create operation to update component state."""
    
def new_defer_component_update(self):
    """Create operation to defer component update."""

Certificate Management Operations

def new_subscribe_to_certificate_updates(self):
    """Create streaming operation to subscribe to certificate updates."""

V2 IPC Client

Client Class

class GreengrassCoreIPCClientV2:
    """
    V2 client for Greengrass Core IPC operations with Future-based interface.
    """
    def __init__(self, connection): ...

The V2 client provides the same operations as V1 but with Future-based return types for better async/await support.

Data Model Classes

Component Management

@dataclass
class GetComponentDetailsRequest:
    """Request to get component details."""
    component_name: str

@dataclass
class GetComponentDetailsResponse:
    """Response containing component details."""
    component_details: Optional[ComponentDetails] = None

@dataclass
class ComponentDetails:
    """Details about a Greengrass component."""
    component_name: Optional[str] = None
    version: Optional[str] = None
    state: Optional[str] = None  # LifecycleState
    configuration: Optional[Dict[str, Any]] = None

@dataclass
class ListComponentsRequest:
    """Request to list components."""
    pass

@dataclass
class ListComponentsResponse:
    """Response containing component list."""
    components: Optional[List[ComponentDetails]] = None

@dataclass
class RestartComponentRequest:
    """Request to restart component."""
    component_name: str

@dataclass
class RestartComponentResponse:
    """Response from restart component operation."""
    restart_status: Optional[str] = None  # RequestStatus
    message: Optional[str] = None

Configuration Management

@dataclass
class GetConfigurationRequest:
    """Request to get configuration."""
    component_name: Optional[str] = None
    key_path: Optional[List[str]] = None

@dataclass
class GetConfigurationResponse:
    """Response containing configuration."""
    component_name: Optional[str] = None
    value: Optional[Dict[str, Any]] = None

@dataclass
class UpdateConfigurationRequest:
    """Request to update configuration."""
    key_path: Optional[List[str]] = None
    timestamp: Optional[datetime.datetime] = None
    value_to_merge: Optional[Dict[str, Any]] = None

@dataclass
class UpdateConfigurationResponse:
    """Response from configuration update."""
    pass

@dataclass
class ConfigurationUpdateEvent:
    """Event for configuration updates."""
    component_name: Optional[str] = None
    key_path: Optional[List[str]] = None

Deployment Management

@dataclass
class CreateLocalDeploymentRequest:
    """Request to create local deployment."""
    group_name: str
    deployment_id: Optional[str] = None
    components_to_add: Optional[Dict[str, ComponentDeploymentSpecification]] = None
    components_to_remove: Optional[List[str]] = None
    components_to_merge: Optional[Dict[str, ComponentDeploymentSpecification]] = None
    failure_handling_policy: Optional[str] = None  # FailureHandlingPolicy
    recipe_directory_path: Optional[str] = None
    artifacts_directory_path: Optional[str] = None

@dataclass
class CreateLocalDeploymentResponse:
    """Response from create local deployment."""
    deployment_id: Optional[str] = None

@dataclass
class GetLocalDeploymentStatusRequest:
    """Request to get deployment status."""
    deployment_id: str

@dataclass
class GetLocalDeploymentStatusResponse:
    """Response containing deployment status."""
    deployment: Optional[LocalDeployment] = None

Pub/Sub Operations

@dataclass
class PublishToTopicRequest:
    """Request to publish to local topic."""
    topic: str
    publish_message: Optional[PublishMessage] = None

@dataclass
class PublishToTopicResponse:
    """Response from publish to topic."""
    pass

@dataclass
class PublishMessage:
    """Message to publish."""
    json_message: Optional[Dict[str, Any]] = None
    binary_message: Optional[BinaryMessage] = None

@dataclass
class SubscribeToTopicRequest:
    """Request to subscribe to local topic."""
    topic: str

@dataclass
class SubscriptionResponseMessage:
    """Message received from subscription."""
    json_message: Optional[Dict[str, Any]] = None
    binary_message: Optional[BinaryMessage] = None
    context: Optional[MessageContext] = None

IoT Core Integration

@dataclass
class PublishToIoTCoreRequest:
    """Request to publish to IoT Core."""
    topic_name: str
    qos: Optional[str] = None  # QOS
    payload: Optional[bytes] = None
    retain: Optional[bool] = None
    user_properties: Optional[List[UserProperty]] = None
    message_expiry_interval_seconds: Optional[int] = None
    correlation_data: Optional[bytes] = None
    response_topic: Optional[str] = None
    payload_format: Optional[str] = None  # PayloadFormat
    content_type: Optional[str] = None

@dataclass
class PublishToIoTCoreResponse:
    """Response from publish to IoT Core."""
    pass

@dataclass
class SubscribeToIoTCoreRequest:
    """Request to subscribe to IoT Core topic."""
    topic_name: str
    qos: Optional[str] = None  # QOS

@dataclass
class IoTCoreMessage:
    """Message from IoT Core."""
    message: Optional[MQTTMessage] = None

Device Shadow Operations

@dataclass
class GetThingShadowRequest:
    """Request to get thing shadow."""
    thing_name: str
    shadow_name: Optional[str] = None

@dataclass
class GetThingShadowResponse:
    """Response containing thing shadow."""
    payload: Optional[bytes] = None

@dataclass
class UpdateThingShadowRequest:
    """Request to update thing shadow."""
    thing_name: str
    payload: bytes
    shadow_name: Optional[str] = None

@dataclass
class UpdateThingShadowResponse:
    """Response from update thing shadow."""
    payload: Optional[bytes] = None

@dataclass
class DeleteThingShadowRequest:
    """Request to delete thing shadow."""
    thing_name: str
    shadow_name: Optional[str] = None

@dataclass
class DeleteThingShadowResponse:
    """Response from delete thing shadow."""
    payload: Optional[bytes] = None

Security Operations

@dataclass
class GetClientDeviceAuthTokenRequest:
    """Request to get client device auth token."""
    credential: Optional[CredentialDocument] = None

@dataclass
class GetClientDeviceAuthTokenResponse:
    """Response containing auth token."""
    client_device_auth_token: Optional[str] = None

@dataclass
class VerifyClientDeviceIdentityRequest:
    """Request to verify client device identity."""
    credential: Optional[CredentialDocument] = None

@dataclass
class VerifyClientDeviceIdentityResponse:
    """Response from identity verification."""
    is_valid_client_device: Optional[bool] = None

@dataclass
class AuthorizeClientDeviceActionRequest:
    """Request to authorize client device action."""
    client_device_auth_token: str
    operation: str
    resource: str

@dataclass
class AuthorizeClientDeviceActionResponse:
    """Response from authorization check."""
    is_authorized: Optional[bool] = None

Secrets Management

@dataclass
class GetSecretValueRequest:
    """Request to get secret value."""
    secret_id: str
    version_id: Optional[str] = None
    version_stage: Optional[str] = None

@dataclass
class GetSecretValueResponse:
    """Response containing secret value."""
    secret_id: Optional[str] = None
    version_id: Optional[str] = None
    secret_binary: Optional[bytes] = None
    secret_string: Optional[str] = None

Metrics

@dataclass
class PutComponentMetricRequest:
    """Request to put component metric."""
    metrics: List[Metric]

@dataclass
class PutComponentMetricResponse:
    """Response from put metric operation."""
    pass

@dataclass
class Metric:
    """Component metric data."""
    name: str
    unit: Optional[str] = None  # MetricUnitType
    value: Optional[float] = None
    timestamp: Optional[datetime.datetime] = None

Constants

Lifecycle States

class LifecycleState:
    """Component lifecycle state constants."""
    NEW = "NEW"
    INSTALLED = "INSTALLED"
    STARTING = "STARTING"
    RUNNING = "RUNNING"
    STOPPING = "STOPPING"
    ERRORED = "ERRORED"
    BROKEN = "BROKEN"
    FINISHED = "FINISHED"

Request Status

class RequestStatus:
    """Request status constants."""
    SUCCEEDED = "SUCCEEDED"
    FAILED = "FAILED"

QoS Levels

class QOS:
    """MQTT QoS level constants."""
    AT_MOST_ONCE = "AT_MOST_ONCE"
    AT_LEAST_ONCE = "AT_LEAST_ONCE"

Usage Examples

Component Management

from awsiot.greengrasscoreipc import connect
import json

# Connect to Greengrass Core
ipc_client = connect()

def list_all_components():
    """List all Greengrass components."""
    try:
        # Create operation
        operation = ipc_client.new_list_components()
        
        # Activate operation  
        operation.activate({})
        
        # Get result
        result = operation.get_response().result()
        
        print("Greengrass Components:")
        for component in result.components:
            print(f"  Name: {component.component_name}")
            print(f"  Version: {component.version}")
            print(f"  State: {component.state}")
            print("  ---")
            
        return result.components
        
    except Exception as e:
        print(f"Failed to list components: {e}")
        return []

def get_component_details(component_name):
    """Get details for a specific component."""
    try:
        operation = ipc_client.new_get_component_details()
        
        request = {
            "componentName": component_name
        }
        
        operation.activate(request)
        result = operation.get_response().result()
        
        details = result.component_details
        print(f"Component: {details.component_name}")
        print(f"Version: {details.version}")
        print(f"State: {details.state}")
        print(f"Configuration: {json.dumps(details.configuration, indent=2)}")
        
        return details
        
    except Exception as e:
        print(f"Failed to get component details: {e}")
        return None

def restart_component(component_name):
    """Restart a Greengrass component."""
    try:
        operation = ipc_client.new_restart_component()
        
        request = {
            "componentName": component_name
        }
        
        operation.activate(request)
        result = operation.get_response().result()
        
        print(f"Restart status: {result.restart_status}")
        if result.message:
            print(f"Message: {result.message}")
            
        return result.restart_status == "SUCCEEDED"
        
    except Exception as e:
        print(f"Failed to restart component: {e}")
        return False

# Usage
components = list_all_components()
if components:
    details = get_component_details(components[0].component_name)
    
    # Restart a component (be careful with this!)
    # restart_component("YourComponentName")

Local Pub/Sub

from awsiot.greengrasscoreipc import connect
import json
import time
import threading

ipc_client = connect()

def publish_to_local_topic(topic, message):
    """Publish message to local Greengrass topic."""
    try:
        operation = ipc_client.new_publish_to_topic()
        
        request = {
            "topic": topic,
            "publishMessage": {
                "jsonMessage": message
            }
        }
        
        operation.activate(request)
        result = operation.get_response().result()
        
        print(f"Published to topic {topic}: {message}")
        return True
        
    except Exception as e:
        print(f"Failed to publish: {e}")
        return False

def subscribe_to_local_topic(topic, callback):
    """Subscribe to local Greengrass topic."""
    try:
        operation = ipc_client.new_subscribe_to_topic()
        
        request = {
            "topic": topic
        }
        
        def stream_handler(event):
            """Handle incoming messages."""
            try:
                if hasattr(event, 'json_message') and event.json_message:
                    callback(topic, event.json_message)
                elif hasattr(event, 'binary_message') and event.binary_message:
                    callback(topic, event.binary_message.message)
            except Exception as e:
                print(f"Error handling message: {e}")
        
        # Activate subscription
        operation.activate(request)
        
        # Set up message handler
        operation.get_response().add_done_callback(
            lambda future: print(f"Subscription to {topic} established")
        )
        
        # Handle stream events
        stream = operation.get_response().result()
        while True:
            try:
                event = stream.get_next_message().result(timeout=1.0)
                stream_handler(event)
            except Exception:
                # Timeout or stream closed
                break
                
    except Exception as e:
        print(f"Failed to subscribe to topic: {e}")

def message_handler(topic, message):
    """Handle received messages."""
    print(f"Received on {topic}: {message}")

# Example usage
def pub_sub_example():
    """Example of local pub/sub communication."""
    
    # Start subscriber in background thread
    subscribe_thread = threading.Thread(
        target=subscribe_to_local_topic,
        args=("sensor/temperature", message_handler)
    )
    subscribe_thread.daemon = True
    subscribe_thread.start()
    
    # Give subscriber time to connect
    time.sleep(1)
    
    # Publish some messages
    for i in range(5):
        message = {
            "temperature": 20.0 + i,
            "humidity": 45.0 + i * 2,
            "timestamp": time.time()
        }
        
        publish_to_local_topic("sensor/temperature", message)
        time.sleep(2)

# Run example
pub_sub_example()

Configuration Management

from awsiot.greengrasscoreipc import connect
import json

ipc_client = connect()

def get_component_configuration(component_name, key_path=None):
    """Get component configuration."""
    try:
        operation = ipc_client.new_get_configuration()
        
        request = {
            "componentName": component_name
        }
        
        if key_path:
            request["keyPath"] = key_path
        
        operation.activate(request)
        result = operation.get_response().result()
        
        print(f"Configuration for {component_name}:")
        print(json.dumps(result.value, indent=2))
        
        return result.value
        
    except Exception as e:
        print(f"Failed to get configuration: {e}")
        return None

def update_component_configuration(key_path, value_to_merge):
    """Update component configuration."""
    try:
        operation = ipc_client.new_update_configuration()
        
        request = {
            "keyPath": key_path,
            "valueToMerge": value_to_merge
        }
        
        operation.activate(request)
        result = operation.get_response().result()
        
        print("Configuration updated successfully")
        return True
        
    except Exception as e:
        print(f"Failed to update configuration: {e}")
        return False

def subscribe_to_config_updates():
    """Subscribe to configuration update events."""
    try:
        operation = ipc_client.new_subscribe_to_configuration_update()
        
        request = {}
        
        def config_update_handler(event):
            """Handle configuration update events."""
            print(f"Configuration updated:")
            print(f"  Component: {event.component_name}")
            print(f"  Key path: {event.key_path}")
        
        operation.activate(request)
        
        # Handle stream events
        stream = operation.get_response().result()
        while True:
            try:
                event = stream.get_next_message().result(timeout=30.0)
                config_update_handler(event)
            except Exception:
                break
                
    except Exception as e:
        print(f"Failed to subscribe to config updates: {e}")

# Usage examples
config = get_component_configuration("MyComponent")

# Update configuration
update_component_configuration(
    ["mqtt", "timeout"],
    {"connectionTimeout": 30}
)

Device Shadow Operations

from awsiot.greengrasscoreipc import connect
import json

ipc_client = connect()

def get_device_shadow(thing_name, shadow_name=None):
    """Get device shadow."""
    try:
        operation = ipc_client.new_get_thing_shadow()
        
        request = {
            "thingName": thing_name
        }
        
        if shadow_name:
            request["shadowName"] = shadow_name
        
        operation.activate(request)
        result = operation.get_response().result()
        
        # Parse shadow payload
        shadow_data = json.loads(result.payload.decode())
        print(f"Shadow for {thing_name}:")
        print(json.dumps(shadow_data, indent=2))
        
        return shadow_data
        
    except Exception as e:
        print(f"Failed to get shadow: {e}")
        return None

def update_device_shadow(thing_name, shadow_state, shadow_name=None):
    """Update device shadow."""
    try:
        operation = ipc_client.new_update_thing_shadow()
        
        shadow_document = {
            "state": shadow_state
        }
        
        request = {
            "thingName": thing_name,
            "payload": json.dumps(shadow_document).encode()
        }
        
        if shadow_name:
            request["shadowName"] = shadow_name
        
        operation.activate(request)
        result = operation.get_response().result()
        
        # Parse response
        response_data = json.loads(result.payload.decode())
        print(f"Shadow updated for {thing_name}:")
        print(json.dumps(response_data, indent=2))
        
        return response_data
        
    except Exception as e:
        print(f"Failed to update shadow: {e}")
        return None

# Usage
shadow = get_device_shadow("MyGreengrassDevice")

# Update shadow
updated_shadow = update_device_shadow(
    "MyGreengrassDevice",
    {
        "reported": {
            "temperature": 23.5,
            "status": "online",
            "last_update": "2023-12-07T10:30:00Z"
        }
    }
)

Secrets Management

from awsiot.greengrasscoreipc import connect

ipc_client = connect()

def get_secret(secret_id, version_id=None, version_stage=None):
    """Get secret value from AWS Secrets Manager."""
    try:
        operation = ipc_client.new_get_secret_value()
        
        request = {
            "secretId": secret_id
        }
        
        if version_id:
            request["versionId"] = version_id
        if version_stage:
            request["versionStage"] = version_stage
        
        operation.activate(request)
        result = operation.get_response().result()
        
        print(f"Retrieved secret: {result.secret_id}")
        
        if result.secret_string:
            return result.secret_string
        elif result.secret_binary:
            return result.secret_binary
        else:
            return None
            
    except Exception as e:
        print(f"Failed to get secret: {e}")
        return None

# Usage
database_password = get_secret("prod/database/password")
if database_password:
    print("Database password retrieved successfully")
    # Use the password in your application

Install with Tessl CLI

npx tessl i tessl/pypi-awsiotsdk

docs

device-shadow.md

fleet-provisioning.md

greengrass-discovery.md

greengrass-ipc.md

index.md

iot-jobs.md

mqtt-connections.md

tile.json