CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-awsiotsdk

AWS IoT SDK based on the AWS Common Runtime for connecting IoT devices to AWS IoT Core services

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

iot-jobs.mddocs/

IoT Jobs Management

Job execution capabilities for AWS IoT device management including job discovery, execution updates, status reporting, and real-time job notifications with support for both pending and in-progress job handling through V1 (callback-based) and V2 (Future-based) client interfaces.

Capabilities

V1 Jobs Client (Callback-Based)

The V1 client provides callback-based operations following the traditional publish/subscribe pattern.

Client Creation

class IotJobsClient(MqttServiceClient):
    """
    V1 client for AWS IoT Jobs service with callback-based operations.
    
    Parameters:
    - mqtt_connection: MQTT connection (mqtt.Connection or mqtt5.Client)
    """
    def __init__(self, mqtt_connection): ...

Usage example:

from awsiot import mqtt_connection_builder, iotjobs

# Create MQTT connection
connection = mqtt_connection_builder.mtls_from_path(
    endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
    cert_filepath="/path/to/certificate.pem.crt",
    pri_key_filepath="/path/to/private.pem.key",
    client_id="jobs-client-123"
)

# Create jobs client
jobs_client = iotjobs.IotJobsClient(connection)

Job Discovery and Execution

Get Pending Job Executions
def publish_get_pending_job_executions(self, request, qos):
    """
    Publish request to get list of pending job executions.
    
    Parameters:
    - request (GetPendingJobExecutionsRequest): Request for pending jobs
    - qos (awscrt.mqtt.QoS): Quality of service
    
    Returns:
    Future: Future that completes when request is published
    """

def subscribe_to_get_pending_job_executions_accepted(self, request, qos, callback):
    """
    Subscribe to successful get pending jobs responses.
    
    Parameters:
    - request (GetPendingJobExecutionsSubscriptionRequest): Subscription request
    - qos (awscrt.mqtt.QoS): Quality of service
    - callback: Function called with pending jobs response
    
    Returns:
    Tuple[Future, str]: Future for subscription, topic string
    """

def subscribe_to_get_pending_job_executions_rejected(self, request, qos, callback):
    """
    Subscribe to rejected get pending jobs responses.
    
    Parameters:
    - request (GetPendingJobExecutionsSubscriptionRequest): Subscription request
    - qos (awscrt.mqtt.QoS): Quality of service
    - callback: Function called when request is rejected
    
    Returns:
    Tuple[Future, str]: Future for subscription, topic string
    """
Start Next Pending Job Execution
def publish_start_next_pending_job_execution(self, request, qos):
    """
    Publish request to start the next pending job execution.
    
    Parameters:
    - request (StartNextPendingJobExecutionRequest): Start job request
    - qos (awscrt.mqtt.QoS): Quality of service
    
    Returns:
    Future: Future that completes when request is published
    """

def subscribe_to_start_next_pending_job_execution_accepted(self, request, qos, callback):
    """
    Subscribe to successful start next job responses.
    
    Parameters:
    - request (StartNextPendingJobExecutionSubscriptionRequest): Subscription request
    - qos (awscrt.mqtt.QoS): Quality of service
    - callback: Function called when job start succeeds
    
    Returns:
    Tuple[Future, str]: Future for subscription, topic string
    """

def subscribe_to_start_next_pending_job_execution_rejected(self, request, qos, callback):
    """
    Subscribe to rejected start next job responses.
    """
Describe Job Execution
def publish_describe_job_execution(self, request, qos):
    """
    Publish request to describe a specific job execution.
    
    Parameters:
    - request (DescribeJobExecutionRequest): Describe job request
    - qos (awscrt.mqtt.QoS): Quality of service
    
    Returns:
    Future: Future that completes when request is published
    """

def subscribe_to_describe_job_execution_accepted(self, request, qos, callback):
    """
    Subscribe to successful describe job responses.
    """

def subscribe_to_describe_job_execution_rejected(self, request, qos, callback):
    """
    Subscribe to rejected describe job responses.
    """
Update Job Execution
def publish_update_job_execution(self, request, qos):
    """
    Publish request to update job execution status.
    
    Parameters:
    - request (UpdateJobExecutionRequest): Update job request
    - qos (awscrt.mqtt.QoS): Quality of service
    
    Returns:
    Future: Future that completes when request is published
    """

def subscribe_to_update_job_execution_accepted(self, request, qos, callback):
    """
    Subscribe to successful update job responses.
    """

def subscribe_to_update_job_execution_rejected(self, request, qos, callback):
    """
    Subscribe to rejected update job responses.
    """

Job Event Subscriptions

Job Executions Changed Events
def subscribe_to_job_executions_changed_events(self, request, qos, callback):
    """
    Subscribe to job executions changed events.
    
    Parameters:
    - request (JobExecutionsChangedSubscriptionRequest): Subscription request
    - qos (awscrt.mqtt.QoS): Quality of service
    - callback: Function called when job executions change
    
    Returns:
    Tuple[Future, str]: Future for subscription, topic string
    """
Next Job Execution Changed Events
def subscribe_to_next_job_execution_changed_events(self, request, qos, callback):
    """
    Subscribe to next job execution changed events.
    
    Parameters:
    - request (NextJobExecutionChangedSubscriptionRequest): Subscription request
    - qos (awscrt.mqtt.QoS): Quality of service
    - callback: Function called when next job execution changes
    
    Returns:
    Tuple[Future, str]: Future for subscription, topic string
    """

V2 Jobs Client (Future-Based)

The V2 client provides Future-based operations with request-response semantics.

Client Creation

class IotJobsClientV2:
    """
    V2 client for AWS IoT Jobs service with Future-based operations.
    """
    def __init__(self, connection): ...

Job Operations

def get_pending_job_executions(self, request):
    """
    Get list of pending job executions using request-response pattern.
    
    Parameters:
    - request (GetPendingJobExecutionsRequest): Request for pending jobs
    
    Returns:
    Future[GetPendingJobExecutionsResponse]: Future containing pending jobs response
    """

def start_next_pending_job_execution(self, request):
    """
    Start the next pending job execution using request-response pattern.
    
    Parameters:
    - request (StartNextPendingJobExecutionRequest): Start job request
    
    Returns:
    Future[StartNextJobExecutionResponse]: Future containing job start response
    """

def describe_job_execution(self, request):
    """
    Describe a specific job execution using request-response pattern.
    
    Parameters:
    - request (DescribeJobExecutionRequest): Describe job request
    
    Returns:
    Future[DescribeJobExecutionResponse]: Future containing job description
    """

def update_job_execution(self, request):
    """
    Update job execution status using request-response pattern.
    
    Parameters:
    - request (UpdateJobExecutionRequest): Update job request
    
    Returns:
    Future[UpdateJobExecutionResponse]: Future containing update response
    """

Streaming Operations

def create_job_executions_changed_stream(self, request, options):
    """
    Create streaming operation for job executions changed events.
    
    Parameters:
    - request (JobExecutionsChangedSubscriptionRequest): Stream request
    - options (ServiceStreamOptions): Stream configuration
    
    Returns:
    StreamingOperation: Streaming operation handle
    """

def create_next_job_execution_changed_stream(self, request, options):
    """
    Create streaming operation for next job execution changed events.
    
    Parameters:
    - request (NextJobExecutionChangedSubscriptionRequest): Stream request
    - options (ServiceStreamOptions): Stream configuration
    
    Returns:
    StreamingOperation: Streaming operation handle
    """

Data Model Classes

Request Classes

@dataclass
class GetPendingJobExecutionsRequest:
    """Request to get pending job executions."""
    thing_name: str

@dataclass
class StartNextPendingJobExecutionRequest:
    """Request to start next pending job execution."""
    thing_name: str
    step_timeout_in_minutes: Optional[int] = None

@dataclass
class DescribeJobExecutionRequest:
    """Request to describe a job execution."""
    thing_name: str
    job_id: str
    execution_number: Optional[int] = None
    include_job_document: Optional[bool] = None

@dataclass
class UpdateJobExecutionRequest:
    """Request to update job execution status."""
    thing_name: str
    job_id: str
    status: str  # JobStatus constant
    status_details: Optional[Dict[str, str]] = None
    step_timeout_in_minutes: Optional[int] = None
    expected_version: Optional[int] = None
    execution_number: Optional[int] = None
    include_job_execution_state: Optional[bool] = None
    include_job_document: Optional[bool] = None

Response Classes

@dataclass
class GetPendingJobExecutionsResponse:
    """Response from get pending job executions."""
    in_progress_jobs: Optional[List[JobExecutionSummary]] = None
    queued_jobs: Optional[List[JobExecutionSummary]] = None
    timestamp: Optional[datetime.datetime] = None
    client_token: Optional[str] = None

@dataclass
class StartNextJobExecutionResponse:
    """Response from start next job execution."""
    execution: Optional[JobExecutionData] = None
    timestamp: Optional[datetime.datetime] = None
    client_token: Optional[str] = None

@dataclass
class DescribeJobExecutionResponse:
    """Response from describe job execution."""
    execution: Optional[JobExecutionData] = None
    timestamp: Optional[datetime.datetime] = None
    client_token: Optional[str] = None

@dataclass
class UpdateJobExecutionResponse:
    """Response from update job execution."""
    execution_state: Optional[JobExecutionState] = None
    job_document: Optional[Dict[str, Any]] = None
    timestamp: Optional[datetime.datetime] = None
    client_token: Optional[str] = None

Job Data Classes

@dataclass
class JobExecutionData:
    """Complete job execution data."""
    job_id: Optional[str] = None
    thing_name: Optional[str] = None
    job_document: Optional[Dict[str, Any]] = None
    status: Optional[str] = None
    status_details: Optional[Dict[str, str]] = None
    queued_at: Optional[datetime.datetime] = None
    started_at: Optional[datetime.datetime] = None
    last_updated_at: Optional[datetime.datetime] = None
    version_number: Optional[int] = None
    execution_number: Optional[int] = None

@dataclass
class JobExecutionState:
    """Job execution state information."""
    status: Optional[str] = None
    status_details: Optional[Dict[str, str]] = None
    version_number: Optional[int] = None

@dataclass
class JobExecutionSummary:
    """Summary of job execution."""
    job_id: Optional[str] = None
    thing_name: Optional[str] = None
    version_number: Optional[int] = None
    execution_number: Optional[int] = None
    queued_at: Optional[datetime.datetime] = None
    started_at: Optional[datetime.datetime] = None
    last_updated_at: Optional[datetime.datetime] = None

Event Classes

@dataclass
class JobExecutionsChangedEvent:
    """Event when job executions change."""
    jobs: Optional[Dict[str, List[JobExecutionSummary]]] = None
    timestamp: Optional[datetime.datetime] = None

@dataclass
class NextJobExecutionChangedEvent:
    """Event when next job execution changes."""
    execution: Optional[JobExecutionData] = None
    timestamp: Optional[datetime.datetime] = None

Subscription Request Classes

@dataclass
class GetPendingJobExecutionsSubscriptionRequest:
    """Subscription request for get pending jobs responses."""
    thing_name: str

@dataclass
class JobExecutionsChangedSubscriptionRequest:
    """Subscription request for job executions changed events."""
    thing_name: str

@dataclass
class NextJobExecutionChangedSubscriptionRequest:
    """Subscription request for next job execution changed events."""
    thing_name: str

Error Classes

@dataclass
class RejectedError:
    """Error response from jobs operations."""
    code: Optional[str] = None  # RejectedErrorCode constant
    message: Optional[str] = None
    timestamp: Optional[datetime.datetime] = None
    client_token: Optional[str] = None

Constants

Job Status

class JobStatus:
    """Job execution status constants."""
    CANCELED = "CANCELED"
    FAILED = "FAILED"
    QUEUED = "QUEUED"
    IN_PROGRESS = "IN_PROGRESS"
    SUCCEEDED = "SUCCEEDED"
    TIMED_OUT = "TIMED_OUT"
    REJECTED = "REJECTED"
    REMOVED = "REMOVED"

Rejected Error Codes

class RejectedErrorCode:
    """Error code constants for rejected operations."""
    INTERNAL_ERROR = "InternalError"
    INVALID_JSON = "InvalidJson"
    INVALID_REQUEST = "InvalidRequest"
    INVALID_STATE_TRANSITION = "InvalidStateTransition"
    RESOURCE_NOT_FOUND = "ResourceNotFound"
    VERSION_MISMATCH = "VersionMismatch"
    INVALID_TOPIC = "InvalidTopic"
    REQUEST_THROTTLED = "RequestThrottled"
    TERMINAL_STATE_REACHED = "TerminalStateReached"

Usage Examples

V1 Client - Basic Job Processing Loop

from awsiot import mqtt_connection_builder, iotjobs
from awscrt import mqtt
import json
import time

# Create connection and client
connection = mqtt_connection_builder.mtls_from_path(
    endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
    cert_filepath="/path/to/certificate.pem.crt",
    pri_key_filepath="/path/to/private.pem.key",
    client_id="job-processor-123"
)

jobs_client = iotjobs.IotJobsClient(connection)
connection.connect().result()

# Track current job
current_job = None

def on_start_job_accepted(response):
    """Handle successful job start."""
    global current_job
    if response.execution:
        current_job = response.execution
        print(f"Started job: {current_job.job_id}")
        print(f"Job document: {current_job.job_document}")
        
        # Process the job (example: firmware update)
        success = process_job(current_job)
        
        # Update job status
        status = iotjobs.JobStatus.SUCCEEDED if success else iotjobs.JobStatus.FAILED
        update_request = iotjobs.UpdateJobExecutionRequest(
            thing_name="MyDevice",
            job_id=current_job.job_id,
            status=status,
            status_details={"result": "Job completed successfully" if success else "Job failed"}
        )
        jobs_client.publish_update_job_execution(update_request, mqtt.QoS.AT_LEAST_ONCE)

def on_start_job_rejected(error):
    """Handle job start rejection."""
    print(f"Job start rejected: {error}")

def on_update_job_accepted(response):
    """Handle successful job update."""
    print(f"Job update accepted: {response}")
    global current_job
    current_job = None

def on_update_job_rejected(error):
    """Handle job update rejection."""
    print(f"Job update rejected: {error}")

def on_next_job_changed(event):
    """Handle next job execution changed events."""
    if event.execution and not current_job:
        print("New job available, starting...")
        start_request = iotjobs.StartNextPendingJobExecutionRequest(
            thing_name="MyDevice",
            step_timeout_in_minutes=10
        )
        jobs_client.publish_start_next_pending_job_execution(start_request, mqtt.QoS.AT_LEAST_ONCE)

def process_job(job_execution):
    """Process the job - implement your job logic here."""
    print(f"Processing job: {job_execution.job_id}")
    
    # Example job processing based on job document
    job_doc = job_execution.job_document
    if job_doc.get("operation") == "firmware_update":
        firmware_url = job_doc.get("firmware_url")
        version = job_doc.get("version")
        print(f"Updating firmware to version {version} from {firmware_url}")
        
        # Simulate firmware update
        time.sleep(5)
        return True
        
    elif job_doc.get("operation") == "config_update":
        config = job_doc.get("config")
        print(f"Updating configuration: {config}")
        
        # Simulate config update
        time.sleep(2)
        return True
    
    return False

# Subscribe to job responses
jobs_client.subscribe_to_start_next_pending_job_execution_accepted(
    iotjobs.StartNextPendingJobExecutionSubscriptionRequest(thing_name="MyDevice"),
    mqtt.QoS.AT_LEAST_ONCE,
    on_start_job_accepted
).result()

jobs_client.subscribe_to_start_next_pending_job_execution_rejected(
    iotjobs.StartNextPendingJobExecutionSubscriptionRequest(thing_name="MyDevice"),
    mqtt.QoS.AT_LEAST_ONCE,
    on_start_job_rejected
).result()

jobs_client.subscribe_to_update_job_execution_accepted(
    iotjobs.UpdateJobExecutionSubscriptionRequest(thing_name="MyDevice"),
    mqtt.QoS.AT_LEAST_ONCE,
    on_update_job_accepted
).result()

jobs_client.subscribe_to_update_job_execution_rejected(
    iotjobs.UpdateJobExecutionSubscriptionRequest(thing_name="MyDevice"),
    mqtt.QoS.AT_LEAST_ONCE,
    on_update_job_rejected
).result()

# Subscribe to job change events
jobs_client.subscribe_to_next_job_execution_changed_events(
    iotjobs.NextJobExecutionChangedSubscriptionRequest(thing_name="MyDevice"),
    mqtt.QoS.AT_LEAST_ONCE,
    on_next_job_changed
).result()

# Check for initial pending jobs
start_request = iotjobs.StartNextPendingJobExecutionRequest(
    thing_name="MyDevice",
    step_timeout_in_minutes=10
)
jobs_client.publish_start_next_pending_job_execution(start_request, mqtt.QoS.AT_LEAST_ONCE)

print("Job processor started. Waiting for jobs...")

V2 Client - Request-Response Pattern

from awsiot import mqtt_connection_builder, iotjobs
import asyncio

async def job_processor():
    # Create connection
    connection = mqtt_connection_builder.mtls_from_path(
        endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
        cert_filepath="/path/to/certificate.pem.crt",
        pri_key_filepath="/path/to/private.pem.key",
        client_id="jobs-v2-client"
    )
    
    # Create V2 client
    jobs_client = iotjobs.IotJobsClientV2(connection)
    await connection.connect()
    
    try:
        while True:
            # Get pending jobs
            pending_request = iotjobs.GetPendingJobExecutionsRequest(thing_name="MyDevice")
            pending_response = await jobs_client.get_pending_job_executions(pending_request)
            
            if pending_response.queued_jobs:
                print(f"Found {len(pending_response.queued_jobs)} pending jobs")
                
                # Start next job
                start_request = iotjobs.StartNextPendingJobExecutionRequest(
                    thing_name="MyDevice",
                    step_timeout_in_minutes=15
                )
                start_response = await jobs_client.start_next_pending_job_execution(start_request)
                
                if start_response.execution:
                    job = start_response.execution
                    print(f"Started job: {job.job_id}")
                    
                    # Process job
                    success = await process_job_async(job)
                    
                    # Update job status
                    status = iotjobs.JobStatus.SUCCEEDED if success else iotjobs.JobStatus.FAILED
                    update_request = iotjobs.UpdateJobExecutionRequest(
                        thing_name="MyDevice",
                        job_id=job.job_id,
                        status=status,
                        status_details={"timestamp": str(datetime.now())}
                    )
                    
                    update_response = await jobs_client.update_job_execution(update_request)
                    print(f"Job {job.job_id} completed with status: {status}")
            else:
                print("No pending jobs, waiting...")
                await asyncio.sleep(30)
                
    except iotjobs.V2ServiceException as e:
        print(f"Jobs operation failed: {e.message}")
        if e.modeled_error:
            print(f"Error details: {e.modeled_error}")
    
    finally:
        await connection.disconnect()

async def process_job_async(job_execution):
    """Async job processing."""
    print(f"Processing job: {job_execution.job_id}")
    
    # Simulate async job processing
    await asyncio.sleep(3)
    
    return True

# Run async job processor
asyncio.run(job_processor())

Job Monitoring with Event Streams

from awsiot import iotjobs

def on_job_changed(event):
    """Handle job executions changed events."""
    print(f"Job executions changed: {event}")
    if event.jobs:
        for status, job_list in event.jobs.items():
            print(f"  {status}: {len(job_list)} jobs")

def on_next_job_changed(event):
    """Handle next job execution changed events."""
    print(f"Next job changed: {event}")
    if event.execution:
        print(f"  Next job: {event.execution.job_id}")

# Create stream options
stream_options = iotjobs.ServiceStreamOptions(
    incoming_event_listener=on_job_changed
)

next_job_stream_options = iotjobs.ServiceStreamOptions(
    incoming_event_listener=on_next_job_changed
)

# Create V2 client and streams
jobs_client = iotjobs.IotJobsClientV2(connection)

job_stream = jobs_client.create_job_executions_changed_stream(
    iotjobs.JobExecutionsChangedSubscriptionRequest(thing_name="MyDevice"),
    stream_options
)

next_job_stream = jobs_client.create_next_job_execution_changed_stream(
    iotjobs.NextJobExecutionChangedSubscriptionRequest(thing_name="MyDevice"),
    next_job_stream_options
)

Install with Tessl CLI

npx tessl i tessl/pypi-awsiotsdk

docs

device-shadow.md

fleet-provisioning.md

greengrass-discovery.md

greengrass-ipc.md

index.md

iot-jobs.md

mqtt-connections.md

tile.json