AWS IoT SDK based on the AWS Common Runtime for connecting IoT devices to AWS IoT Core services
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Greengrass Core discovery functionality for edge computing scenarios including connectivity information retrieval and core device discovery for local communication setup in AWS IoT Greengrass environments.
The DiscoveryClient enables devices to discover nearby Greengrass Core devices and their connectivity information for local communication.
class DiscoveryClient:
"""
Client for Greengrass discovery operations.
Parameters:
- bootstrap: Client bootstrap for networking
- socket_options: TCP socket options
- tls_context: TLS context for secure connection
- region (str): AWS region
- gg_server_name (str): Greengrass server name (optional)
- proxy_options: HTTP proxy options (optional)
"""
def __init__(self, bootstrap, socket_options, tls_context, region, gg_server_name=None, proxy_options=None): ...def discover(self, thing_name):
"""
Discover Greengrass Core devices for the specified thing.
Parameters:
- thing_name (str): Name of the IoT thing to discover cores for
Returns:
Future[DiscoverResponse]: Future containing discovery response with core information
"""@dataclass
class DiscoverResponse:
"""
Response from Greengrass discovery operation containing core information.
"""
gg_groups: Optional[List[GGGroup]] = None@dataclass
class GGGroup:
"""
Greengrass group containing core devices and their connectivity information.
"""
gg_group_id: Optional[str] = None
cores: Optional[List[GGCore]] = None
certificate_authorities: Optional[List[str]] = None@dataclass
class GGCore:
"""
Greengrass Core device with connectivity endpoints.
"""
thing_arn: Optional[str] = None
connectivity: Optional[List[ConnectivityInfo]] = None@dataclass
class ConnectivityInfo:
"""
Connectivity information for reaching a Greengrass Core.
"""
id: Optional[str] = None
host_address: Optional[str] = None
port: Optional[int] = None
metadata: Optional[str] = Noneclass DiscoveryException(Exception):
"""
Exception raised during Greengrass discovery operations.
Attributes:
- message (str): Error message
- status_code (int): HTTP status code from discovery service
"""
def __init__(self, message, status_code=None): ...from awsiot import greengrass_discovery
from awscrt import io, auth, http, mqtt
import json
# Create event loop group and host resolver
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
# Create client bootstrap
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
# Create socket options
socket_options = io.SocketOptions()
# Create TLS context with device certificate
tls_context_options = io.TlsContextOptions.create_client_with_mtls_from_path(
cert_filepath="/path/to/device-certificate.pem.crt",
pri_key_filepath="/path/to/device-private.pem.key"
)
tls_context = io.ClientTlsContext(tls_context_options)
# Create discovery client
discovery_client = greengrass_discovery.DiscoveryClient(
bootstrap=client_bootstrap,
socket_options=socket_options,
tls_context=tls_context,
region="us-east-1"
)
def discover_cores(thing_name):
"""Discover Greengrass cores for a device."""
try:
print(f"Discovering Greengrass cores for thing: {thing_name}")
# Perform discovery
discover_future = discovery_client.discover(thing_name)
discover_response = discover_future.result(timeout=30)
print(f"Discovery successful! Found {len(discover_response.gg_groups)} groups")
for group in discover_response.gg_groups:
print(f"\nGroup ID: {group.gg_group_id}")
print(f"Certificate Authorities: {len(group.certificate_authorities)}")
for i, ca_pem in enumerate(group.certificate_authorities):
# Save CA certificates for later use
with open(f"/tmp/gg-ca-{i}.pem", "w") as f:
f.write(ca_pem)
print(f" Saved CA certificate {i} to /tmp/gg-ca-{i}.pem")
print(f"Cores: {len(group.cores)}")
for core in group.cores:
print(f" Core ARN: {core.thing_arn}")
print(f" Connectivity options: {len(core.connectivity)}")
for conn in core.connectivity:
print(f" ID: {conn.id}")
print(f" Host: {conn.host_address}:{conn.port}")
if conn.metadata:
print(f" Metadata: {conn.metadata}")
return discover_response
except greengrass_discovery.DiscoveryException as e:
print(f"Discovery failed: {e.message}")
if e.status_code:
print(f"Status code: {e.status_code}")
return None
except Exception as e:
print(f"Discovery error: {e}")
return None
# Discover cores
discovery_response = discover_cores("MyGreengrassDevice")from awsiot import greengrass_discovery, mqtt_connection_builder
from awscrt import io, mqtt
import json
def connect_to_greengrass_core(thing_name):
"""Discover and connect to a Greengrass core."""
# Perform discovery
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
socket_options = io.SocketOptions()
tls_context_options = io.TlsContextOptions.create_client_with_mtls_from_path(
cert_filepath="/path/to/device-certificate.pem.crt",
pri_key_filepath="/path/to/device-private.pem.key"
)
tls_context = io.ClientTlsContext(tls_context_options)
discovery_client = greengrass_discovery.DiscoveryClient(
bootstrap=client_bootstrap,
socket_options=socket_options,
tls_context=tls_context,
region="us-east-1"
)
try:
# Discover cores
discover_future = discovery_client.discover(thing_name)
discover_response = discover_future.result(timeout=30)
if not discover_response.gg_groups:
print("No Greengrass groups found")
return None
# Try to connect to the first available core
for group in discover_response.gg_groups:
if not group.cores:
continue
# Save CA certificates
ca_filepath = "/tmp/gg-ca.pem"
with open(ca_filepath, "w") as f:
for ca_pem in group.certificate_authorities:
f.write(ca_pem)
for core in group.cores:
if not core.connectivity:
continue
# Try each connectivity option
for conn_info in core.connectivity:
try:
print(f"Attempting connection to {conn_info.host_address}:{conn_info.port}")
# Create MQTT connection to Greengrass core
connection = mqtt_connection_builder.mtls_from_path(
endpoint=conn_info.host_address,
port=conn_info.port,
cert_filepath="/path/to/device-certificate.pem.crt",
pri_key_filepath="/path/to/device-private.pem.key",
ca_filepath=ca_filepath,
client_id=thing_name,
clean_session=False,
keep_alive_secs=30
)
# Connect
connect_future = connection.connect()
connect_future.result(timeout=10)
print(f"Successfully connected to Greengrass core at {conn_info.host_address}:{conn_info.port}")
# Test communication
test_communication(connection, thing_name)
# Disconnect
disconnect_future = connection.disconnect()
disconnect_future.result()
return connection
except Exception as e:
print(f"Failed to connect to {conn_info.host_address}:{conn_info.port}: {e}")
continue
print("Failed to connect to any discovered Greengrass cores")
return None
except Exception as e:
print(f"Discovery and connection failed: {e}")
return None
def test_communication(connection, thing_name):
"""Test communication with Greengrass core."""
def on_message_received(topic, payload, dup, qos, retain, **kwargs):
print(f"Received message on topic {topic}: {payload.decode()}")
# Subscribe to a local topic
local_topic = f"greengrass/device/{thing_name}/data"
subscribe_future, _ = connection.subscribe(
topic=local_topic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_message_received
)
subscribe_future.result()
# Publish a test message
test_message = {
"device_id": thing_name,
"message": "Hello from device!",
"timestamp": "2023-12-07T10:30:00Z"
}
publish_future, _ = connection.publish(
topic=local_topic,
payload=json.dumps(test_message),
qos=mqtt.QoS.AT_LEAST_ONCE
)
publish_future.result()
print("Test message published to Greengrass core")
# Connect to Greengrass core
connection = connect_to_greengrass_core("MyGreengrassDevice")from awsiot import greengrass_discovery, mqtt_connection_builder
import time
class GreengrassConnector:
def __init__(self, thing_name, cert_path, key_path, region):
self.thing_name = thing_name
self.cert_path = cert_path
self.key_path = key_path
self.region = region
self.current_connection = None
self.discovered_cores = []
def discover_and_connect(self):
"""Discover cores and establish connection with fallback."""
# Perform discovery
discovery_response = self._discover_cores()
if not discovery_response:
return False
# Extract all connectivity options
self.discovered_cores = []
for group in discovery_response.gg_groups:
# Save CA certificates
ca_filepath = f"/tmp/gg-ca-{group.gg_group_id}.pem"
with open(ca_filepath, "w") as f:
for ca_pem in group.certificate_authorities:
f.write(ca_pem)
for core in group.cores:
for conn_info in core.connectivity:
self.discovered_cores.append({
'host': conn_info.host_address,
'port': conn_info.port,
'ca_file': ca_filepath,
'core_arn': core.thing_arn,
'group_id': group.gg_group_id
})
# Sort by preference (you can implement your own logic)
self.discovered_cores.sort(key=lambda x: x['port']) # Prefer certain ports
# Try to connect to cores in order
for core_info in self.discovered_cores:
if self._try_connect(core_info):
return True
print("Failed to connect to any discovered Greengrass cores")
return False
def _discover_cores(self):
"""Perform Greengrass discovery."""
try:
# Setup discovery client
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
socket_options = io.SocketOptions()
tls_context_options = io.TlsContextOptions.create_client_with_mtls_from_path(
cert_filepath=self.cert_path,
pri_key_filepath=self.key_path
)
tls_context = io.ClientTlsContext(tls_context_options)
discovery_client = greengrass_discovery.DiscoveryClient(
bootstrap=client_bootstrap,
socket_options=socket_options,
tls_context=tls_context,
region=self.region
)
# Perform discovery
discover_future = discovery_client.discover(self.thing_name)
return discover_future.result(timeout=30)
except Exception as e:
print(f"Discovery failed: {e}")
return None
def _try_connect(self, core_info, timeout=10):
"""Try to connect to a specific core."""
try:
print(f"Trying to connect to {core_info['host']}:{core_info['port']}")
connection = mqtt_connection_builder.mtls_from_path(
endpoint=core_info['host'],
port=core_info['port'],
cert_filepath=self.cert_path,
pri_key_filepath=self.key_path,
ca_filepath=core_info['ca_file'],
client_id=self.thing_name,
clean_session=False,
keep_alive_secs=30
)
# Try to connect
connect_future = connection.connect()
connect_future.result(timeout=timeout)
self.current_connection = connection
print(f"Connected to Greengrass core: {core_info['core_arn']}")
return True
except Exception as e:
print(f"Connection failed to {core_info['host']}:{core_info['port']}: {e}")
return False
def reconnect(self):
"""Reconnect to Greengrass core with failover."""
if self.current_connection:
try:
self.current_connection.disconnect().result()
except:
pass
# Try current core first, then others
for core_info in self.discovered_cores:
if self._try_connect(core_info):
return True
# If all cores fail, try discovery again
print("All known cores failed, attempting rediscovery...")
return self.discover_and_connect()
def disconnect(self):
"""Disconnect from current core."""
if self.current_connection:
try:
disconnect_future = self.current_connection.disconnect()
disconnect_future.result()
print("Disconnected from Greengrass core")
except Exception as e:
print(f"Error during disconnect: {e}")
finally:
self.current_connection = None
# Usage
connector = GreengrassConnector(
thing_name="MyGreengrassDevice",
cert_path="/path/to/device-certificate.pem.crt",
key_path="/path/to/device-private.pem.key",
region="us-east-1"
)
if connector.discover_and_connect():
print("Successfully connected to Greengrass!")
# Your application logic here
try:
# Simulate work
time.sleep(60)
except KeyboardInterrupt:
pass
finally:
connector.disconnect()
else:
print("Failed to connect to Greengrass")To use Greengrass discovery, ensure your device certificate has the necessary permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"greengrass:Discover"
],
"Resource": "*"
}
]
}Thing Association: The device thing must be associated with a Greengrass group.
Core Configuration: The Greengrass core must be properly configured and running with connectivity information published.
Install with Tessl CLI
npx tessl i tessl/pypi-awsiotsdk