SDK for connecting to AWS IoT using Python.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Job execution management for device fleet management, including job subscription, execution state management, progress reporting, and job lifecycle operations. AWS IoT Jobs enable you to define a set of remote operations that can be sent to and executed on one or more devices connected to AWS IoT.
Create a specialized MQTT client for AWS IoT Jobs operations with thing-specific configuration and job management capabilities.
class AWSIoTMQTTThingJobsClient:
def __init__(self, clientID: str, thingName: str, QoS: int = 0, protocolType: int = MQTTv3_1_1, useWebsocket: bool = False, cleanSession: bool = True, awsIoTMQTTClient = None):
"""
Create AWS IoT MQTT Jobs client for thing-specific job operations.
Args:
clientID (str): Client identifier for MQTT connection and job tokens
thingName (str): AWS IoT thing name for job topic routing
QoS (int): Default QoS level for all job operations (0 or 1)
protocolType (int): MQTT version (MQTTv3_1=3, MQTTv3_1_1=4)
useWebsocket (bool): Enable MQTT over WebSocket SigV4
cleanSession (bool): Start with clean session state
awsIoTMQTTClient (AWSIoTMQTTClient): Existing MQTT client to reuse (optional)
"""Subscribe to various job-related topics for monitoring job state changes and receiving job notifications.
def createJobSubscription(self, callback: callable, jobExecutionType = None, jobReplyType = None, jobId: str = None) -> bool:
"""
Subscribe to job-related topic synchronously.
Args:
callback (callable): Message callback (client, userdata, message) -> None
jobExecutionType: Topic type from jobExecutionTopicType enum
jobReplyType: Reply type from jobExecutionTopicReplyType enum
jobId (str): Specific job ID or None for wildcard
Returns:
bool: True if subscription successful, False otherwise
"""
def createJobSubscriptionAsync(self, ackCallback: callable, callback: callable, jobExecutionType = None, jobReplyType = None, jobId: str = None) -> int:
"""
Subscribe to job-related topic asynchronously.
Args:
ackCallback (callable): SUBACK callback (mid, data) -> None
callback (callable): Message callback (client, userdata, message) -> None
jobExecutionType: Topic type from jobExecutionTopicType enum
jobReplyType: Reply type from jobExecutionTopicReplyType enum
jobId (str): Specific job ID or None for wildcard
Returns:
int: Packet ID for tracking in callback
"""Query job information including pending jobs list and specific job descriptions.
def sendJobsQuery(self, jobExecTopicType, jobId: str = None) -> bool:
"""
Send job query request.
Args:
jobExecTopicType: Query type from jobExecutionTopicType enum
jobId (str): Job ID for specific queries, or None for general queries
Returns:
bool: True if query sent successfully, False otherwise
"""
def sendJobsDescribe(self, jobId: str, executionNumber: int = 0, includeJobDocument: bool = True) -> bool:
"""
Request description of specific job execution.
Args:
jobId (str): Job ID to describe (can be '$next' for next pending job)
executionNumber (int): Specific execution number (0 for latest)
includeJobDocument (bool): Include job document in response
Returns:
bool: True if describe request sent, False otherwise
"""Start, update, and manage job execution lifecycle and status reporting.
def sendJobsStartNext(self, statusDetails: dict = None, stepTimeoutInMinutes: int = None) -> bool:
"""
Start next pending job execution.
Args:
statusDetails (dict): Optional status details for job start
stepTimeoutInMinutes (int): Timeout for job step execution
Returns:
bool: True if start request sent, False otherwise
"""
def sendJobsUpdate(self, jobId: str, status: int, statusDetails: dict = None, expectedVersion: int = 0, executionNumber: int = 0, includeJobExecutionState: bool = False, includeJobDocument: bool = False, stepTimeoutInMinutes: int = None) -> bool:
"""
Update job execution status and progress.
Args:
jobId (str): Job ID to update
status (int): New job status from jobExecutionStatus enum
statusDetails (dict): Optional status details and progress information
expectedVersion (int): Expected current version for optimistic locking
executionNumber (int): Execution number (0 for latest)
includeJobExecutionState (bool): Include execution state in response
includeJobDocument (bool): Include job document in response
stepTimeoutInMinutes (int): Timeout for next job step
Returns:
bool: True if update sent successfully, False otherwise
"""# Job execution topic types
class jobExecutionTopicType:
JOB_UNRECOGNIZED_TOPIC = (0, False, '')
JOB_GET_PENDING_TOPIC = (1, False, 'get') # Get list of pending jobs
JOB_START_NEXT_TOPIC = (2, False, 'start-next') # Start next pending job
JOB_DESCRIBE_TOPIC = (3, True, 'get') # Describe specific job
JOB_UPDATE_TOPIC = (4, True, 'update') # Update job status
JOB_NOTIFY_TOPIC = (5, False, 'notify') # Job notifications
JOB_NOTIFY_NEXT_TOPIC = (6, False, 'notify-next') # Next job notifications
JOB_WILDCARD_TOPIC = (7, False, '+') # Wildcard subscription
# Job execution reply topic types
class jobExecutionTopicReplyType:
JOB_UNRECOGNIZED_TOPIC_TYPE = (0, '')
JOB_REQUEST_TYPE = (1, '') # Request topic (no suffix)
JOB_ACCEPTED_REPLY_TYPE = (2, '/accepted') # Accepted response topic
JOB_REJECTED_REPLY_TYPE = (3, '/rejected') # Rejected response topic
JOB_WILDCARD_REPLY_TYPE = (4, '/#') # Wildcard reply subscription
# Job execution status values
class jobExecutionStatus:
JOB_EXECUTION_STATUS_NOT_SET = (0, None)
JOB_EXECUTION_QUEUED = (1, 'QUEUED') # Job is queued for execution
JOB_EXECUTION_IN_PROGRESS = (2, 'IN_PROGRESS') # Job is currently executing
JOB_EXECUTION_FAILED = (3, 'FAILED') # Job execution failed
JOB_EXECUTION_SUCCEEDED = (4, 'SUCCEEDED') # Job completed successfully
JOB_EXECUTION_CANCELED = (5, 'CANCELED') # Job was canceled
JOB_EXECUTION_REJECTED = (6, 'REJECTED') # Job was rejected
JOB_EXECUTION_UNKNOWN_STATUS = (99, None) # Unknown statusimport AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType
import json
# Create jobs client
jobsClient = AWSIoTPyMQTT.AWSIoTMQTTThingJobsClient("myJobsClient", "myThingName")
jobsClient.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
jobsClient.configureCredentials("rootCA.crt", "private.key", "certificate.crt")
jobsClient.connect()
# Job notification callback
def jobNotificationCallback(client, userdata, message):
print(f"Job notification on {message.topic}: {message.payload.decode()}")
payload = json.loads(message.payload.decode())
if "execution" in payload:
job_id = payload["execution"]["jobId"]
status = payload["execution"]["status"]
print(f"Job {job_id} status: {status}")
# Subscribe to job notifications
jobsClient.createJobSubscription(
jobNotificationCallback,
jobExecutionTopicType.JOB_NOTIFY_TOPIC
)
# Subscribe to next job notifications
jobsClient.createJobSubscription(
jobNotificationCallback,
jobExecutionTopicType.JOB_NOTIFY_NEXT_TOPIC
)
# Query pending jobs
jobsClient.sendJobsQuery(jobExecutionTopicType.JOB_GET_PENDING_TOPIC)import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionStatus
import json
import time
# Create jobs client
jobsClient = AWSIoTPyMQTT.AWSIoTMQTTThingJobsClient("deviceJobExecutor", "myDevice")
jobsClient.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
jobsClient.configureCredentials("rootCA.crt", "private.key", "certificate.crt")
jobsClient.connect()
current_job_id = None
# Job response callback
def jobResponseCallback(client, userdata, message):
global current_job_id
print(f"Job response on {message.topic}: {message.payload.decode()}")
payload = json.loads(message.payload.decode())
# Handle start-next response
if "execution" in payload and payload["execution"]["jobId"]:
current_job_id = payload["execution"]["jobId"]
job_document = payload["execution"]["jobDocument"]
print(f"Started job {current_job_id}")
print(f"Job document: {job_document}")
# Execute job logic here
execute_job(current_job_id, job_document)
def execute_job(job_id, job_document):
"""Execute the job and report progress"""
try:
print(f"Executing job {job_id}...")
# Update job status to IN_PROGRESS
jobsClient.sendJobsUpdate(
job_id,
jobExecutionStatus.JOB_EXECUTION_IN_PROGRESS[0],
{"progress": "starting execution"}
)
# Simulate job execution steps
for step in range(1, 4):
print(f"Executing step {step}...")
time.sleep(2) # Simulate work
# Report progress
jobsClient.sendJobsUpdate(
job_id,
jobExecutionStatus.JOB_EXECUTION_IN_PROGRESS[0],
{"progress": f"completed step {step}/3"}
)
# Job completed successfully
jobsClient.sendJobsUpdate(
job_id,
jobExecutionStatus.JOB_EXECUTION_SUCCEEDED[0],
{"result": "job completed successfully"}
)
print(f"Job {job_id} completed successfully")
except Exception as e:
# Job failed
jobsClient.sendJobsUpdate(
job_id,
jobExecutionStatus.JOB_EXECUTION_FAILED[0],
{"error": str(e)}
)
print(f"Job {job_id} failed: {e}")
# Subscribe to job responses
jobsClient.createJobSubscription(
jobResponseCallback,
jobExecutionTopicType.JOB_START_NEXT_TOPIC,
jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE
)
jobsClient.createJobSubscription(
jobResponseCallback,
jobExecutionTopicType.JOB_UPDATE_TOPIC,
jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE
)
# Start next pending job
print("Starting next pending job...")
jobsClient.sendJobsStartNext({"device": "ready for work"})
# Keep running to process jobs
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
jobsClient.disconnect()import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionStatus
import json
# Create jobs client with higher QoS
jobsClient = AWSIoTPyMQTT.AWSIoTMQTTThingJobsClient("advancedJobClient", "myDevice", QoS=1)
jobsClient.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
jobsClient.configureCredentials("rootCA.crt", "private.key", "certificate.crt")
jobsClient.connect()
# Advanced job callback with detailed handling
def advancedJobCallback(client, userdata, message):
topic = message.topic
payload = json.loads(message.payload.decode())
print(f"Received job message on {topic}")
if "/accepted" in topic:
if "execution" in payload:
job_id = payload["execution"]["jobId"]
status = payload["execution"]["status"]
version = payload["execution"]["versionNumber"]
print(f"Job {job_id} accepted - Status: {status}, Version: {version}")
# Handle job document updates with versioning
if "jobDocument" in payload["execution"]:
job_doc = payload["execution"]["jobDocument"]
process_job_with_version(job_id, job_doc, version)
elif "/rejected" in topic:
error_code = payload.get("code", "Unknown")
error_message = payload.get("message", "No details")
print(f"Job operation rejected - Code: {error_code}, Message: {error_message}")
def process_job_with_version(job_id, job_document, expected_version):
"""Process job with version control for concurrent updates"""
try:
# Process job document
operation = job_document.get("operation", "unknown")
parameters = job_document.get("parameters", {})
print(f"Processing {operation} with parameters: {parameters}")
# Update with expected version for optimistic locking
jobsClient.sendJobsUpdate(
job_id,
jobExecutionStatus.JOB_EXECUTION_IN_PROGRESS[0],
statusDetails={"operation": operation, "stage": "processing"},
expectedVersion=expected_version,
includeJobExecutionState=True
)
# Simulate processing time
import time
time.sleep(3)
# Complete with final status
jobsClient.sendJobsUpdate(
job_id,
jobExecutionStatus.JOB_EXECUTION_SUCCEEDED[0],
statusDetails={"operation": operation, "result": "completed"},
expectedVersion=expected_version + 1, # Version incremented by previous update
includeJobDocument=False
)
except Exception as e:
jobsClient.sendJobsUpdate(
job_id,
jobExecutionStatus.JOB_EXECUTION_FAILED[0],
statusDetails={"error": str(e)},
expectedVersion=expected_version
)
# Subscribe to multiple job topics with different callbacks
jobsClient.createJobSubscription(
advancedJobCallback,
jobExecutionTopicType.JOB_START_NEXT_TOPIC,
jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE
)
jobsClient.createJobSubscription(
advancedJobCallback,
jobExecutionTopicType.JOB_UPDATE_TOPIC,
jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE
)
jobsClient.createJobSubscription(
advancedJobCallback,
jobExecutionTopicType.JOB_UPDATE_TOPIC,
jobExecutionTopicReplyType.JOB_REJECTED_REPLY_TYPE
)
# Query specific job details
specific_job_id = "job-12345"
jobsClient.sendJobsDescribe(
specific_job_id,
executionNumber=0,
includeJobDocument=True
)
# Start next job with timeout
jobsClient.sendJobsStartNext(
statusDetails={"device_info": "ready", "capabilities": ["firmware_update", "config_change"]},
stepTimeoutInMinutes=30
)# Job execution callback signature
def jobCallback(client, userdata: dict, message) -> None:
"""
Job operation callback.
Args:
client: MQTT client instance
userdata (dict): User data passed to callback
message: MQTT message with .topic and .payload attributes
"""
# Job document structure (example)
job_document = {
"operation": "firmware_update",
"parameters": {
"firmware_url": "https://example.com/firmware.bin",
"version": "1.2.3",
"checksum": "sha256:abc123..."
},
"timeout": 3600
}
# Job execution response structure (example)
job_execution_response = {
"execution": {
"jobId": "job-12345",
"status": "IN_PROGRESS",
"statusDetails": {
"progress": "50%",
"step": "downloading"
},
"queuedAt": 1609459200,
"startedAt": 1609459210,
"lastUpdatedAt": 1609459250,
"versionNumber": 2,
"executionNumber": 1,
"jobDocument": {
"operation": "firmware_update",
"parameters": {...}
}
},
"timestamp": 1609459250,
"clientToken": "token-abc-123"
}Install with Tessl CLI
npx tessl i tessl/pypi-awsiotpythonsdk