CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-awsiotsdk

AWS IoT SDK based on the AWS Common Runtime for connecting IoT devices to AWS IoT Core services

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

greengrass-discovery.mddocs/

Greengrass Discovery

Greengrass Core discovery functionality for edge computing scenarios including connectivity information retrieval and core device discovery for local communication setup in AWS IoT Greengrass environments.

Capabilities

Discovery Client

The DiscoveryClient enables devices to discover nearby Greengrass Core devices and their connectivity information for local communication.

Client Creation

class DiscoveryClient:
    """
    Client for Greengrass discovery operations.
    
    Parameters:
    - bootstrap: Client bootstrap for networking
    - socket_options: TCP socket options
    - tls_context: TLS context for secure connection
    - region (str): AWS region
    - gg_server_name (str): Greengrass server name (optional)
    - proxy_options: HTTP proxy options (optional)
    """
    def __init__(self, bootstrap, socket_options, tls_context, region, gg_server_name=None, proxy_options=None): ...

Discovery Operations

def discover(self, thing_name):
    """
    Discover Greengrass Core devices for the specified thing.
    
    Parameters:
    - thing_name (str): Name of the IoT thing to discover cores for
    
    Returns:
    Future[DiscoverResponse]: Future containing discovery response with core information
    """

Data Model Classes

Discovery Response

@dataclass
class DiscoverResponse:
    """
    Response from Greengrass discovery operation containing core information.
    """
    gg_groups: Optional[List[GGGroup]] = None

Greengrass Group

@dataclass
class GGGroup:
    """
    Greengrass group containing core devices and their connectivity information.
    """
    gg_group_id: Optional[str] = None
    cores: Optional[List[GGCore]] = None
    certificate_authorities: Optional[List[str]] = None

Greengrass Core

@dataclass  
class GGCore:
    """
    Greengrass Core device with connectivity endpoints.
    """
    thing_arn: Optional[str] = None
    connectivity: Optional[List[ConnectivityInfo]] = None

Connectivity Information

@dataclass
class ConnectivityInfo:
    """
    Connectivity information for reaching a Greengrass Core.
    """
    id: Optional[str] = None
    host_address: Optional[str] = None
    port: Optional[int] = None
    metadata: Optional[str] = None

Exception Classes

class DiscoveryException(Exception):
    """
    Exception raised during Greengrass discovery operations.
    
    Attributes:
    - message (str): Error message
    - status_code (int): HTTP status code from discovery service
    """
    def __init__(self, message, status_code=None): ...

Usage Examples

Basic Discovery

from awsiot import greengrass_discovery
from awscrt import io, auth, http, mqtt
import json

# Create event loop group and host resolver
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)

# Create client bootstrap
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

# Create socket options
socket_options = io.SocketOptions()

# Create TLS context with device certificate
tls_context_options = io.TlsContextOptions.create_client_with_mtls_from_path(
    cert_filepath="/path/to/device-certificate.pem.crt",
    pri_key_filepath="/path/to/device-private.pem.key"
)
tls_context = io.ClientTlsContext(tls_context_options)

# Create discovery client
discovery_client = greengrass_discovery.DiscoveryClient(
    bootstrap=client_bootstrap,
    socket_options=socket_options,
    tls_context=tls_context,
    region="us-east-1"
)

def discover_cores(thing_name):
    """Discover Greengrass cores for a device."""
    try:
        print(f"Discovering Greengrass cores for thing: {thing_name}")
        
        # Perform discovery
        discover_future = discovery_client.discover(thing_name)
        discover_response = discover_future.result(timeout=30)
        
        print(f"Discovery successful! Found {len(discover_response.gg_groups)} groups")
        
        for group in discover_response.gg_groups:
            print(f"\nGroup ID: {group.gg_group_id}")
            print(f"Certificate Authorities: {len(group.certificate_authorities)}")
            
            for i, ca_pem in enumerate(group.certificate_authorities):
                # Save CA certificates for later use
                with open(f"/tmp/gg-ca-{i}.pem", "w") as f:
                    f.write(ca_pem)
                print(f"  Saved CA certificate {i} to /tmp/gg-ca-{i}.pem")
            
            print(f"Cores: {len(group.cores)}")
            for core in group.cores:
                print(f"  Core ARN: {core.thing_arn}")
                print(f"  Connectivity options: {len(core.connectivity)}")
                
                for conn in core.connectivity:
                    print(f"    ID: {conn.id}")
                    print(f"    Host: {conn.host_address}:{conn.port}")
                    if conn.metadata:
                        print(f"    Metadata: {conn.metadata}")
        
        return discover_response
        
    except greengrass_discovery.DiscoveryException as e:
        print(f"Discovery failed: {e.message}")
        if e.status_code:
            print(f"Status code: {e.status_code}")
        return None
    except Exception as e:
        print(f"Discovery error: {e}")
        return None

# Discover cores
discovery_response = discover_cores("MyGreengrassDevice")

Connect to Discovered Core

from awsiot import greengrass_discovery, mqtt_connection_builder
from awscrt import io, mqtt
import json

def connect_to_greengrass_core(thing_name):
    """Discover and connect to a Greengrass core."""
    
    # Perform discovery
    event_loop_group = io.EventLoopGroup(1)
    host_resolver = io.DefaultHostResolver(event_loop_group)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
    socket_options = io.SocketOptions()
    
    tls_context_options = io.TlsContextOptions.create_client_with_mtls_from_path(
        cert_filepath="/path/to/device-certificate.pem.crt",
        pri_key_filepath="/path/to/device-private.pem.key"
    )
    tls_context = io.ClientTlsContext(tls_context_options)
    
    discovery_client = greengrass_discovery.DiscoveryClient(
        bootstrap=client_bootstrap,
        socket_options=socket_options,
        tls_context=tls_context,
        region="us-east-1"
    )
    
    try:
        # Discover cores
        discover_future = discovery_client.discover(thing_name)
        discover_response = discover_future.result(timeout=30)
        
        if not discover_response.gg_groups:
            print("No Greengrass groups found")
            return None
        
        # Try to connect to the first available core
        for group in discover_response.gg_groups:
            if not group.cores:
                continue
                
            # Save CA certificates
            ca_filepath = "/tmp/gg-ca.pem"
            with open(ca_filepath, "w") as f:
                for ca_pem in group.certificate_authorities:
                    f.write(ca_pem)
            
            for core in group.cores:
                if not core.connectivity:
                    continue
                
                # Try each connectivity option
                for conn_info in core.connectivity:
                    try:
                        print(f"Attempting connection to {conn_info.host_address}:{conn_info.port}")
                        
                        # Create MQTT connection to Greengrass core
                        connection = mqtt_connection_builder.mtls_from_path(
                            endpoint=conn_info.host_address,
                            port=conn_info.port,
                            cert_filepath="/path/to/device-certificate.pem.crt",
                            pri_key_filepath="/path/to/device-private.pem.key",
                            ca_filepath=ca_filepath,
                            client_id=thing_name,
                            clean_session=False,
                            keep_alive_secs=30
                        )
                        
                        # Connect
                        connect_future = connection.connect()
                        connect_future.result(timeout=10)
                        
                        print(f"Successfully connected to Greengrass core at {conn_info.host_address}:{conn_info.port}")
                        
                        # Test communication
                        test_communication(connection, thing_name)
                        
                        # Disconnect
                        disconnect_future = connection.disconnect()
                        disconnect_future.result()
                        
                        return connection
                        
                    except Exception as e:
                        print(f"Failed to connect to {conn_info.host_address}:{conn_info.port}: {e}")
                        continue
        
        print("Failed to connect to any discovered Greengrass cores")
        return None
        
    except Exception as e:
        print(f"Discovery and connection failed: {e}")
        return None

def test_communication(connection, thing_name):
    """Test communication with Greengrass core."""
    
    def on_message_received(topic, payload, dup, qos, retain, **kwargs):
        print(f"Received message on topic {topic}: {payload.decode()}")
    
    # Subscribe to a local topic
    local_topic = f"greengrass/device/{thing_name}/data"
    subscribe_future, _ = connection.subscribe(
        topic=local_topic,
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_message_received
    )
    subscribe_future.result()
    
    # Publish a test message
    test_message = {
        "device_id": thing_name,
        "message": "Hello from device!",
        "timestamp": "2023-12-07T10:30:00Z"
    }
    
    publish_future, _ = connection.publish(
        topic=local_topic,
        payload=json.dumps(test_message),
        qos=mqtt.QoS.AT_LEAST_ONCE
    )
    publish_future.result()
    
    print("Test message published to Greengrass core")

# Connect to Greengrass core
connection = connect_to_greengrass_core("MyGreengrassDevice")

Discovery with Connection Fallback

from awsiot import greengrass_discovery, mqtt_connection_builder
import time

class GreengrassConnector:
    def __init__(self, thing_name, cert_path, key_path, region):
        self.thing_name = thing_name
        self.cert_path = cert_path
        self.key_path = key_path
        self.region = region
        self.current_connection = None
        self.discovered_cores = []
    
    def discover_and_connect(self):
        """Discover cores and establish connection with fallback."""
        
        # Perform discovery
        discovery_response = self._discover_cores()
        if not discovery_response:
            return False
        
        # Extract all connectivity options
        self.discovered_cores = []
        for group in discovery_response.gg_groups:
            # Save CA certificates
            ca_filepath = f"/tmp/gg-ca-{group.gg_group_id}.pem"
            with open(ca_filepath, "w") as f:
                for ca_pem in group.certificate_authorities:
                    f.write(ca_pem)
            
            for core in group.cores:
                for conn_info in core.connectivity:
                    self.discovered_cores.append({
                        'host': conn_info.host_address,
                        'port': conn_info.port,
                        'ca_file': ca_filepath,
                        'core_arn': core.thing_arn,
                        'group_id': group.gg_group_id
                    })
        
        # Sort by preference (you can implement your own logic)
        self.discovered_cores.sort(key=lambda x: x['port'])  # Prefer certain ports
        
        # Try to connect to cores in order
        for core_info in self.discovered_cores:
            if self._try_connect(core_info):
                return True
        
        print("Failed to connect to any discovered Greengrass cores")
        return False
    
    def _discover_cores(self):
        """Perform Greengrass discovery."""
        try:
            # Setup discovery client
            event_loop_group = io.EventLoopGroup(1)
            host_resolver = io.DefaultHostResolver(event_loop_group)
            client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
            socket_options = io.SocketOptions()
            
            tls_context_options = io.TlsContextOptions.create_client_with_mtls_from_path(
                cert_filepath=self.cert_path,
                pri_key_filepath=self.key_path
            )
            tls_context = io.ClientTlsContext(tls_context_options)
            
            discovery_client = greengrass_discovery.DiscoveryClient(
                bootstrap=client_bootstrap,
                socket_options=socket_options,
                tls_context=tls_context,
                region=self.region
            )
            
            # Perform discovery
            discover_future = discovery_client.discover(self.thing_name)
            return discover_future.result(timeout=30)
            
        except Exception as e:
            print(f"Discovery failed: {e}")
            return None
    
    def _try_connect(self, core_info, timeout=10):
        """Try to connect to a specific core."""
        try:
            print(f"Trying to connect to {core_info['host']}:{core_info['port']}")
            
            connection = mqtt_connection_builder.mtls_from_path(
                endpoint=core_info['host'],
                port=core_info['port'],
                cert_filepath=self.cert_path,
                pri_key_filepath=self.key_path,
                ca_filepath=core_info['ca_file'],
                client_id=self.thing_name,
                clean_session=False,
                keep_alive_secs=30
            )
            
            # Try to connect
            connect_future = connection.connect()
            connect_future.result(timeout=timeout)
            
            self.current_connection = connection
            print(f"Connected to Greengrass core: {core_info['core_arn']}")
            return True
            
        except Exception as e:
            print(f"Connection failed to {core_info['host']}:{core_info['port']}: {e}")
            return False
    
    def reconnect(self):
        """Reconnect to Greengrass core with failover."""
        if self.current_connection:
            try:
                self.current_connection.disconnect().result()
            except:
                pass
        
        # Try current core first, then others
        for core_info in self.discovered_cores:
            if self._try_connect(core_info):
                return True
        
        # If all cores fail, try discovery again
        print("All known cores failed, attempting rediscovery...")
        return self.discover_and_connect()
    
    def disconnect(self):
        """Disconnect from current core."""
        if self.current_connection:
            try:
                disconnect_future = self.current_connection.disconnect()
                disconnect_future.result()
                print("Disconnected from Greengrass core")
            except Exception as e:
                print(f"Error during disconnect: {e}")
            finally:
                self.current_connection = None

# Usage
connector = GreengrassConnector(
    thing_name="MyGreengrassDevice",
    cert_path="/path/to/device-certificate.pem.crt",
    key_path="/path/to/device-private.pem.key",
    region="us-east-1"
)

if connector.discover_and_connect():
    print("Successfully connected to Greengrass!")
    
    # Your application logic here
    try:
        # Simulate work
        time.sleep(60)
    except KeyboardInterrupt:
        pass
    finally:
        connector.disconnect()
else:
    print("Failed to connect to Greengrass")

Discovery Configuration

To use Greengrass discovery, ensure your device certificate has the necessary permissions:

  1. IoT Policy: Attach a policy that allows discovery:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "greengrass:Discover"
      ],
      "Resource": "*"
    }
  ]
}
  1. Thing Association: The device thing must be associated with a Greengrass group.

  2. Core Configuration: The Greengrass core must be properly configured and running with connectivity information published.

Install with Tessl CLI

npx tessl i tessl/pypi-awsiotsdk

docs

device-shadow.md

fleet-provisioning.md

greengrass-discovery.md

greengrass-ipc.md

index.md

iot-jobs.md

mqtt-connections.md

tile.json