CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-awsiotpythonsdk

SDK for connecting to AWS IoT using Python.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

iot-jobs.mddocs/

AWS IoT Jobs

Job execution management for device fleet management, including job subscription, execution state management, progress reporting, and job lifecycle operations. AWS IoT Jobs enable you to define a set of remote operations that can be sent to and executed on one or more devices connected to AWS IoT.

Capabilities

Jobs Client Creation

Create a specialized MQTT client for AWS IoT Jobs operations with thing-specific configuration and job management capabilities.

class AWSIoTMQTTThingJobsClient:
    def __init__(self, clientID: str, thingName: str, QoS: int = 0, protocolType: int = MQTTv3_1_1, useWebsocket: bool = False, cleanSession: bool = True, awsIoTMQTTClient = None):
        """
        Create AWS IoT MQTT Jobs client for thing-specific job operations.
        
        Args:
            clientID (str): Client identifier for MQTT connection and job tokens
            thingName (str): AWS IoT thing name for job topic routing
            QoS (int): Default QoS level for all job operations (0 or 1)
            protocolType (int): MQTT version (MQTTv3_1=3, MQTTv3_1_1=4)
            useWebsocket (bool): Enable MQTT over WebSocket SigV4
            cleanSession (bool): Start with clean session state
            awsIoTMQTTClient (AWSIoTMQTTClient): Existing MQTT client to reuse (optional)
        """

Job Subscriptions

Subscribe to various job-related topics for monitoring job state changes and receiving job notifications.

def createJobSubscription(self, callback: callable, jobExecutionType = None, jobReplyType = None, jobId: str = None) -> bool:
    """
    Subscribe to job-related topic synchronously.
    
    Args:
        callback (callable): Message callback (client, userdata, message) -> None
        jobExecutionType: Topic type from jobExecutionTopicType enum
        jobReplyType: Reply type from jobExecutionTopicReplyType enum  
        jobId (str): Specific job ID or None for wildcard
        
    Returns:
        bool: True if subscription successful, False otherwise
    """

def createJobSubscriptionAsync(self, ackCallback: callable, callback: callable, jobExecutionType = None, jobReplyType = None, jobId: str = None) -> int:
    """
    Subscribe to job-related topic asynchronously.
    
    Args:
        ackCallback (callable): SUBACK callback (mid, data) -> None
        callback (callable): Message callback (client, userdata, message) -> None
        jobExecutionType: Topic type from jobExecutionTopicType enum
        jobReplyType: Reply type from jobExecutionTopicReplyType enum
        jobId (str): Specific job ID or None for wildcard
        
    Returns:
        int: Packet ID for tracking in callback
    """

Job Queries

Query job information including pending jobs list and specific job descriptions.

def sendJobsQuery(self, jobExecTopicType, jobId: str = None) -> bool:
    """
    Send job query request.
    
    Args:
        jobExecTopicType: Query type from jobExecutionTopicType enum
        jobId (str): Job ID for specific queries, or None for general queries
        
    Returns:
        bool: True if query sent successfully, False otherwise
    """

def sendJobsDescribe(self, jobId: str, executionNumber: int = 0, includeJobDocument: bool = True) -> bool:
    """
    Request description of specific job execution.
    
    Args:
        jobId (str): Job ID to describe (can be '$next' for next pending job)
        executionNumber (int): Specific execution number (0 for latest)
        includeJobDocument (bool): Include job document in response
        
    Returns:
        bool: True if describe request sent, False otherwise
    """

Job Execution Management

Start, update, and manage job execution lifecycle and status reporting.

def sendJobsStartNext(self, statusDetails: dict = None, stepTimeoutInMinutes: int = None) -> bool:
    """
    Start next pending job execution.
    
    Args:
        statusDetails (dict): Optional status details for job start
        stepTimeoutInMinutes (int): Timeout for job step execution
        
    Returns:
        bool: True if start request sent, False otherwise
    """

def sendJobsUpdate(self, jobId: str, status: int, statusDetails: dict = None, expectedVersion: int = 0, executionNumber: int = 0, includeJobExecutionState: bool = False, includeJobDocument: bool = False, stepTimeoutInMinutes: int = None) -> bool:
    """
    Update job execution status and progress.
    
    Args:
        jobId (str): Job ID to update
        status (int): New job status from jobExecutionStatus enum
        statusDetails (dict): Optional status details and progress information
        expectedVersion (int): Expected current version for optimistic locking
        executionNumber (int): Execution number (0 for latest)
        includeJobExecutionState (bool): Include execution state in response
        includeJobDocument (bool): Include job document in response
        stepTimeoutInMinutes (int): Timeout for next job step
        
    Returns:
        bool: True if update sent successfully, False otherwise
    """

Job Topic Types and Constants

# Job execution topic types
class jobExecutionTopicType:
    JOB_UNRECOGNIZED_TOPIC = (0, False, '')
    JOB_GET_PENDING_TOPIC = (1, False, 'get')              # Get list of pending jobs
    JOB_START_NEXT_TOPIC = (2, False, 'start-next')        # Start next pending job
    JOB_DESCRIBE_TOPIC = (3, True, 'get')                  # Describe specific job
    JOB_UPDATE_TOPIC = (4, True, 'update')                 # Update job status
    JOB_NOTIFY_TOPIC = (5, False, 'notify')                # Job notifications
    JOB_NOTIFY_NEXT_TOPIC = (6, False, 'notify-next')      # Next job notifications
    JOB_WILDCARD_TOPIC = (7, False, '+')                   # Wildcard subscription

# Job execution reply topic types
class jobExecutionTopicReplyType:
    JOB_UNRECOGNIZED_TOPIC_TYPE = (0, '')
    JOB_REQUEST_TYPE = (1, '')                              # Request topic (no suffix)
    JOB_ACCEPTED_REPLY_TYPE = (2, '/accepted')             # Accepted response topic
    JOB_REJECTED_REPLY_TYPE = (3, '/rejected')             # Rejected response topic
    JOB_WILDCARD_REPLY_TYPE = (4, '/#')                    # Wildcard reply subscription

# Job execution status values
class jobExecutionStatus:
    JOB_EXECUTION_STATUS_NOT_SET = (0, None)
    JOB_EXECUTION_QUEUED = (1, 'QUEUED')                   # Job is queued for execution
    JOB_EXECUTION_IN_PROGRESS = (2, 'IN_PROGRESS')         # Job is currently executing
    JOB_EXECUTION_FAILED = (3, 'FAILED')                   # Job execution failed
    JOB_EXECUTION_SUCCEEDED = (4, 'SUCCEEDED')             # Job completed successfully
    JOB_EXECUTION_CANCELED = (5, 'CANCELED')               # Job was canceled
    JOB_EXECUTION_REJECTED = (6, 'REJECTED')               # Job was rejected
    JOB_EXECUTION_UNKNOWN_STATUS = (99, None)              # Unknown status

Usage Examples

Basic Job Monitoring

import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType
import json

# Create jobs client
jobsClient = AWSIoTPyMQTT.AWSIoTMQTTThingJobsClient("myJobsClient", "myThingName")
jobsClient.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
jobsClient.configureCredentials("rootCA.crt", "private.key", "certificate.crt")
jobsClient.connect()

# Job notification callback
def jobNotificationCallback(client, userdata, message):
    print(f"Job notification on {message.topic}: {message.payload.decode()}")
    payload = json.loads(message.payload.decode())
    
    if "execution" in payload:
        job_id = payload["execution"]["jobId"]
        status = payload["execution"]["status"]
        print(f"Job {job_id} status: {status}")

# Subscribe to job notifications
jobsClient.createJobSubscription(
    jobNotificationCallback,
    jobExecutionTopicType.JOB_NOTIFY_TOPIC
)

# Subscribe to next job notifications  
jobsClient.createJobSubscription(
    jobNotificationCallback,
    jobExecutionTopicType.JOB_NOTIFY_NEXT_TOPIC
)

# Query pending jobs
jobsClient.sendJobsQuery(jobExecutionTopicType.JOB_GET_PENDING_TOPIC)

Job Execution Workflow

import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionStatus
import json
import time

# Create jobs client
jobsClient = AWSIoTPyMQTT.AWSIoTMQTTThingJobsClient("deviceJobExecutor", "myDevice")
jobsClient.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
jobsClient.configureCredentials("rootCA.crt", "private.key", "certificate.crt")
jobsClient.connect()

current_job_id = None

# Job response callback
def jobResponseCallback(client, userdata, message):
    global current_job_id
    print(f"Job response on {message.topic}: {message.payload.decode()}")
    
    payload = json.loads(message.payload.decode())
    
    # Handle start-next response
    if "execution" in payload and payload["execution"]["jobId"]:
        current_job_id = payload["execution"]["jobId"]
        job_document = payload["execution"]["jobDocument"]
        
        print(f"Started job {current_job_id}")
        print(f"Job document: {job_document}")
        
        # Execute job logic here
        execute_job(current_job_id, job_document)

def execute_job(job_id, job_document):
    """Execute the job and report progress"""
    try:
        print(f"Executing job {job_id}...")
        
        # Update job status to IN_PROGRESS
        jobsClient.sendJobsUpdate(
            job_id, 
            jobExecutionStatus.JOB_EXECUTION_IN_PROGRESS[0],
            {"progress": "starting execution"}
        )
        
        # Simulate job execution steps
        for step in range(1, 4):
            print(f"Executing step {step}...")
            time.sleep(2)  # Simulate work
            
            # Report progress
            jobsClient.sendJobsUpdate(
                job_id,
                jobExecutionStatus.JOB_EXECUTION_IN_PROGRESS[0],
                {"progress": f"completed step {step}/3"}
            )
        
        # Job completed successfully
        jobsClient.sendJobsUpdate(
            job_id,
            jobExecutionStatus.JOB_EXECUTION_SUCCEEDED[0],
            {"result": "job completed successfully"}
        )
        
        print(f"Job {job_id} completed successfully")
        
    except Exception as e:
        # Job failed
        jobsClient.sendJobsUpdate(
            job_id,
            jobExecutionStatus.JOB_EXECUTION_FAILED[0],
            {"error": str(e)}
        )
        print(f"Job {job_id} failed: {e}")

# Subscribe to job responses
jobsClient.createJobSubscription(
    jobResponseCallback,
    jobExecutionTopicType.JOB_START_NEXT_TOPIC,
    jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE
)

jobsClient.createJobSubscription(
    jobResponseCallback,
    jobExecutionTopicType.JOB_UPDATE_TOPIC,
    jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE
)

# Start next pending job
print("Starting next pending job...")
jobsClient.sendJobsStartNext({"device": "ready for work"})

# Keep running to process jobs
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    jobsClient.disconnect()

Advanced Job Management

import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionStatus
import json

# Create jobs client with higher QoS
jobsClient = AWSIoTPyMQTT.AWSIoTMQTTThingJobsClient("advancedJobClient", "myDevice", QoS=1)
jobsClient.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
jobsClient.configureCredentials("rootCA.crt", "private.key", "certificate.crt")
jobsClient.connect()

# Advanced job callback with detailed handling
def advancedJobCallback(client, userdata, message):
    topic = message.topic
    payload = json.loads(message.payload.decode())
    
    print(f"Received job message on {topic}")
    
    if "/accepted" in topic:
        if "execution" in payload:
            job_id = payload["execution"]["jobId"]
            status = payload["execution"]["status"]
            version = payload["execution"]["versionNumber"]
            
            print(f"Job {job_id} accepted - Status: {status}, Version: {version}")
            
            # Handle job document updates with versioning
            if "jobDocument" in payload["execution"]:
                job_doc = payload["execution"]["jobDocument"]
                process_job_with_version(job_id, job_doc, version)
    
    elif "/rejected" in topic:
        error_code = payload.get("code", "Unknown")
        error_message = payload.get("message", "No details")
        print(f"Job operation rejected - Code: {error_code}, Message: {error_message}")

def process_job_with_version(job_id, job_document, expected_version):
    """Process job with version control for concurrent updates"""
    try:
        # Process job document
        operation = job_document.get("operation", "unknown")
        parameters = job_document.get("parameters", {})
        
        print(f"Processing {operation} with parameters: {parameters}")
        
        # Update with expected version for optimistic locking
        jobsClient.sendJobsUpdate(
            job_id,
            jobExecutionStatus.JOB_EXECUTION_IN_PROGRESS[0],
            statusDetails={"operation": operation, "stage": "processing"},
            expectedVersion=expected_version,
            includeJobExecutionState=True
        )
        
        # Simulate processing time
        import time
        time.sleep(3)
        
        # Complete with final status
        jobsClient.sendJobsUpdate(
            job_id,
            jobExecutionStatus.JOB_EXECUTION_SUCCEEDED[0],
            statusDetails={"operation": operation, "result": "completed"},
            expectedVersion=expected_version + 1,  # Version incremented by previous update
            includeJobDocument=False
        )
        
    except Exception as e:
        jobsClient.sendJobsUpdate(
            job_id,
            jobExecutionStatus.JOB_EXECUTION_FAILED[0],
            statusDetails={"error": str(e)},
            expectedVersion=expected_version
        )

# Subscribe to multiple job topics with different callbacks
jobsClient.createJobSubscription(
    advancedJobCallback,
    jobExecutionTopicType.JOB_START_NEXT_TOPIC,
    jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE
)

jobsClient.createJobSubscription(
    advancedJobCallback,
    jobExecutionTopicType.JOB_UPDATE_TOPIC,
    jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE
)

jobsClient.createJobSubscription(
    advancedJobCallback,
    jobExecutionTopicType.JOB_UPDATE_TOPIC,
    jobExecutionTopicReplyType.JOB_REJECTED_REPLY_TYPE
)

# Query specific job details
specific_job_id = "job-12345"
jobsClient.sendJobsDescribe(
    specific_job_id,
    executionNumber=0,
    includeJobDocument=True
)

# Start next job with timeout
jobsClient.sendJobsStartNext(
    statusDetails={"device_info": "ready", "capabilities": ["firmware_update", "config_change"]},
    stepTimeoutInMinutes=30
)

Types

# Job execution callback signature
def jobCallback(client, userdata: dict, message) -> None:
    """
    Job operation callback.
    
    Args:
        client: MQTT client instance
        userdata (dict): User data passed to callback
        message: MQTT message with .topic and .payload attributes
    """

# Job document structure (example)
job_document = {
    "operation": "firmware_update",
    "parameters": {
        "firmware_url": "https://example.com/firmware.bin",
        "version": "1.2.3",
        "checksum": "sha256:abc123..."
    },
    "timeout": 3600
}

# Job execution response structure (example)
job_execution_response = {
    "execution": {
        "jobId": "job-12345",
        "status": "IN_PROGRESS",
        "statusDetails": {
            "progress": "50%",
            "step": "downloading"
        },
        "queuedAt": 1609459200,
        "startedAt": 1609459210,
        "lastUpdatedAt": 1609459250,
        "versionNumber": 2,
        "executionNumber": 1,
        "jobDocument": {
            "operation": "firmware_update",
            "parameters": {...}
        }
    },
    "timestamp": 1609459250,
    "clientToken": "token-abc-123"
}

Install with Tessl CLI

npx tessl i tessl/pypi-awsiotpythonsdk

docs

device-shadows.md

exception-handling.md

greengrass-discovery.md

index.md

iot-jobs.md

mqtt-client.md

tile.json