SDK for connecting to AWS IoT using Python.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Discovery service for finding and connecting to AWS Greengrass Core devices in local networks. Enables devices to discover Greengrass cores, retrieve connectivity information, and obtain CA certificates for establishing secure connections within the Greengrass group.
Create a discovery client to find Greengrass cores and retrieve connectivity information for local communication.
class DiscoveryInfoProvider:
def __init__(self, caPath: str = "", certPath: str = "", keyPath: str = "", host: str = "", port: int = 8443, timeoutSec: int = 120):
"""
Create Greengrass discovery client.
Args:
caPath (str): Path to root CA certificate file
certPath (str): Path to device certificate file
keyPath (str): Path to private key file
host (str): Discovery service endpoint hostname
port (int): Discovery service port (default 8443)
timeoutSec (int): Discovery request timeout in seconds
"""Configure discovery endpoint, credentials, and timeout settings.
def configureEndpoint(self, host: str, port: int = 8443):
"""
Configure discovery service endpoint.
Args:
host (str): Discovery service hostname
port (int): Discovery service port (default 8443)
"""
def configureCredentials(self, caPath: str, certPath: str, keyPath: str):
"""
Configure X.509 certificate credentials for discovery authentication.
Args:
caPath (str): Path to root CA certificate file
certPath (str): Path to device certificate file
keyPath (str): Path to private key file
"""
def configureTimeout(self, timeoutSec: int):
"""
Configure discovery request timeout.
Args:
timeoutSec (int): Timeout in seconds for discovery requests
"""Perform discovery requests to find Greengrass cores and retrieve connectivity information.
def discover(self, thingName: str) -> 'DiscoveryInfo':
"""
Discover Greengrass cores for the specified thing.
Args:
thingName (str): AWS IoT thing name to discover cores for
Returns:
DiscoveryInfo: Discovery results with core connectivity information
Raises:
DiscoveryTimeoutException: Discovery request timed out
DiscoveryUnauthorizedException: Authentication failed
DiscoveryDataNotFoundException: No discovery data found for thing
"""Data structures for managing Greengrass discovery information and connectivity details.
Container for all discovery information including cores, groups, and CA certificates.
class DiscoveryInfo:
def getAllCores(self) -> list:
"""
Get all core connectivity information.
Returns:
list: List of CoreConnectivityInfo objects
"""
def getAllCas(self) -> list:
"""
Get all CA certificates from discovery.
Returns:
list: List of (groupId, caContent) tuples
"""
def getAllGroups(self) -> list:
"""
Get all group connectivity information.
Returns:
list: List of GroupConnectivityInfo objects
"""
def toObjectAtGroupLevel(self) -> dict:
"""
Convert discovery info to group-level dictionary structure.
Returns:
dict: Dictionary with groupId keys and GroupConnectivityInfo values
"""Connectivity information for a Greengrass group containing cores and CA certificates.
class GroupConnectivityInfo:
# Properties
groupId: str # Greengrass group ID
coreConnectivityInfoList: list # List of CoreConnectivityInfo objects
caList: list # List of CA certificates for this group
def getCoreConnectivityInfo(self, coreThingArn: str) -> 'CoreConnectivityInfo':
"""
Get connectivity info for specific core by ARN.
Args:
coreThingArn (str): Core thing ARN to find
Returns:
CoreConnectivityInfo: Connectivity info for the specified core
"""
def appendCoreConnectivityInfo(self, coreConnectivityInfo: 'CoreConnectivityInfo'):
"""
Add core connectivity information to this group.
Args:
coreConnectivityInfo (CoreConnectivityInfo): Core connectivity info to add
"""
def appendCa(self, ca: str):
"""
Add CA certificate to this group.
Args:
ca (str): CA certificate content to add
"""Connectivity information for a specific Greengrass core device.
class CoreConnectivityInfo:
# Properties
coreThingArn: str # Core thing ARN
groupId: str # Associated group ID
connectivityInfoList: list # List of ConnectivityInfo objects
def getConnectivityInfo(self, id: str) -> 'ConnectivityInfo':
"""
Get connectivity info by ID.
Args:
id (str): Connectivity info ID to find
Returns:
ConnectivityInfo: Connectivity information for the specified ID
"""
def appendConnectivityInfo(self, connectivityInfo: 'ConnectivityInfo'):
"""
Add connectivity information to this core.
Args:
connectivityInfo (ConnectivityInfo): Connectivity info to add
"""Individual connectivity endpoint information for reaching a Greengrass core.
class ConnectivityInfo:
# Properties
id: str # Connectivity info ID
host: str # Hostname or IP address
port: int # Port number
metadata: dict # Additional metadatafrom AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider
from AWSIoTPythonSDK.exception.AWSIoTExceptions import DiscoveryTimeoutException
import json
# Create discovery provider
discoveryInfoProvider = DiscoveryInfoProvider()
# Configure discovery endpoint
discoveryInfoProvider.configureEndpoint("greengrass-ats.iot.region.amazonaws.com")
# Configure credentials
discoveryInfoProvider.configureCredentials(
"rootCA.crt",
"certificate.crt",
"private.key"
)
# Configure timeout
discoveryInfoProvider.configureTimeout(10)
try:
# Perform discovery
discoveryInfo = discoveryInfoProvider.discover("myThingName")
# Get all discovered cores
cores = discoveryInfo.getAllCores()
print(f"Discovered {len(cores)} Greengrass cores")
# Get all CA certificates
cas = discoveryInfo.getAllCas()
print(f"Retrieved {len(cas)} CA certificates")
# Print discovery results
for core in cores:
print(f"Core: {core.coreThingArn}")
print(f"Group: {core.groupId}")
for conn_info in core.connectivityInfoList:
print(f" Endpoint: {conn_info.host}:{conn_info.port}")
except DiscoveryTimeoutException:
print("Discovery request timed out")
except Exception as e:
print(f"Discovery failed: {e}")from AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider
import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
import json
import ssl
import tempfile
import os
# Perform discovery
discoveryInfoProvider = DiscoveryInfoProvider()
discoveryInfoProvider.configureEndpoint("greengrass-ats.iot.region.amazonaws.com")
discoveryInfoProvider.configureCredentials("rootCA.crt", "certificate.crt", "private.key")
discoveryInfo = discoveryInfoProvider.discover("myDevice")
# Get group-level information
groups = discoveryInfo.toObjectAtGroupLevel()
for group_id, group_info in groups.items():
print(f"Processing group: {group_id}")
# Write group CA certificates to temporary files
ca_files = []
for i, (_, ca_content) in enumerate(group_info.caList):
ca_file = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.crt')
ca_file.write(ca_content)
ca_file.close()
ca_files.append(ca_file.name)
# Try to connect to each core in the group
for core in group_info.coreConnectivityInfoList:
print(f"Trying to connect to core: {core.coreThingArn}")
# Try each connectivity option for this core
for conn_info in core.connectivityInfoList:
try:
print(f"Attempting connection to {conn_info.host}:{conn_info.port}")
# Create MQTT client for this core
client = AWSIoTPyMQTT.AWSIoTMQTTClient("greengrassDevice")
client.configureEndpoint(conn_info.host, conn_info.port)
# Use the first CA file for this group
if ca_files:
client.configureCredentials(ca_files[0], "private.key", "certificate.crt")
# Configure connection parameters
client.configureConnectDisconnectTimeout(10)
client.configureMQTTOperationTimeout(5)
# Attempt connection
if client.connect():
print(f"Successfully connected to {conn_info.host}:{conn_info.port}")
# Test basic MQTT operations
client.publish("greengrass/test", f"Hello from {conn_info.host}", 0)
client.disconnect()
# Clean up CA files
for ca_file in ca_files:
os.unlink(ca_file)
# Connection successful, exit loops
exit()
except Exception as e:
print(f"Failed to connect to {conn_info.host}:{conn_info.port}: {e}")
continue
# Clean up CA files if no connection succeeded
for ca_file in ca_files:
os.unlink(ca_file)from AWSIoTPythonSDK.core.greengrass.discovery.providers import DiscoveryInfoProvider
from AWSIoTPythonSDK.exception.AWSIoTExceptions import (
DiscoveryTimeoutException,
DiscoveryUnauthorizedException,
DiscoveryDataNotFoundException,
DiscoveryThrottlingException,
DiscoveryFailure
)
import time
import json
def perform_discovery_with_retry(thing_name, max_retries=3, retry_delay=5):
"""Perform discovery with retry logic and comprehensive error handling"""
discoveryInfoProvider = DiscoveryInfoProvider()
discoveryInfoProvider.configureEndpoint("greengrass-ats.iot.region.amazonaws.com", 8443)
discoveryInfoProvider.configureCredentials("rootCA.crt", "certificate.crt", "private.key")
discoveryInfoProvider.configureTimeout(30)
for attempt in range(max_retries):
try:
print(f"Discovery attempt {attempt + 1}/{max_retries} for thing: {thing_name}")
discoveryInfo = discoveryInfoProvider.discover(thing_name)
# Validate discovery results
cores = discoveryInfo.getAllCores()
if not cores:
raise Exception("No Greengrass cores found in discovery response")
cas = discoveryInfo.getAllCas()
if not cas:
raise Exception("No CA certificates found in discovery response")
print(f"Discovery successful: {len(cores)} cores, {len(cas)} CAs")
return discoveryInfo
except DiscoveryTimeoutException:
print(f"Discovery timeout on attempt {attempt + 1}")
if attempt < max_retries - 1:
print(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
except DiscoveryUnauthorizedException:
print("Discovery unauthorized - check credentials and thing permissions")
break # Don't retry auth failures
except DiscoveryDataNotFoundException:
print(f"No discovery data found for thing: {thing_name}")
print("Ensure thing is associated with a Greengrass group")
break # Don't retry data not found
except DiscoveryThrottlingException:
print("Discovery request throttled")
if attempt < max_retries - 1:
backoff_delay = retry_delay * (2 ** attempt) # Exponential backoff
print(f"Backing off for {backoff_delay} seconds...")
time.sleep(backoff_delay)
except DiscoveryFailure as e:
print(f"Discovery service failure: {e}")
if attempt < max_retries - 1:
print(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
except Exception as e:
print(f"Unexpected discovery error: {e}")
if attempt < max_retries - 1:
print(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
raise Exception(f"Discovery failed after {max_retries} attempts")
def analyze_discovery_results(discoveryInfo):
"""Analyze and display discovery results in detail"""
print("=== Discovery Results Analysis ===")
# Group-level analysis
groups = discoveryInfo.toObjectAtGroupLevel()
print(f"Found {len(groups)} Greengrass groups")
for group_id, group_info in groups.items():
print(f"\nGroup: {group_id}")
print(f" Cores: {len(group_info.coreConnectivityInfoList)}")
print(f" CA Certificates: {len(group_info.caList)}")
# Analyze each core
for i, core in enumerate(group_info.coreConnectivityInfoList):
print(f" Core {i+1}: {core.coreThingArn}")
print(f" Connectivity options: {len(core.connectivityInfoList)}")
# Analyze connectivity options
for j, conn_info in enumerate(core.connectivityInfoList):
print(f" Option {j+1}: {conn_info.host}:{conn_info.port}")
if conn_info.metadata:
print(f" Metadata: {json.dumps(conn_info.metadata, indent=8)}")
# Provide recommendations
print("\n=== Recommendations ===")
total_cores = len(discoveryInfo.getAllCores())
if total_cores > 1:
print("Multiple cores available - consider load balancing or failover")
elif total_cores == 1:
print("Single core available - ensure high availability setup")
else:
print("No cores found - check Greengrass group configuration")
# Example usage
try:
discovery_results = perform_discovery_with_retry("myIoTDevice")
analyze_discovery_results(discovery_results)
except Exception as e:
print(f"Discovery completely failed: {e}")# Discovery exception types
class DiscoveryTimeoutException(Exception):
"""Raised when discovery request times out"""
class DiscoveryUnauthorizedException(Exception):
"""Raised when discovery authentication fails"""
class DiscoveryDataNotFoundException(Exception):
"""Raised when no discovery data found for thing"""
class DiscoveryThrottlingException(Exception):
"""Raised when discovery requests are throttled"""
class DiscoveryFailure(Exception):
"""Raised for general discovery service failures"""
# Discovery result structure
discovery_response = {
"GGGroups": [
{
"GGGroupId": "group-12345",
"Cores": [
{
"thingArn": "arn:aws:iot:region:account:thing/core-thing",
"Connectivity": [
{
"Id": "connection-1",
"HostAddress": "192.168.1.100",
"PortNumber": 8883,
"Metadata": {
"network": "local"
}
}
]
}
],
"CAs": [
"-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----"
]
}
]
}Install with Tessl CLI
npx tessl i tessl/pypi-awsiotpythonsdk