AWS IoT SDK based on the AWS Common Runtime for connecting IoT devices to AWS IoT Core services
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Job execution capabilities for AWS IoT device management including job discovery, execution updates, status reporting, and real-time job notifications with support for both pending and in-progress job handling through V1 (callback-based) and V2 (Future-based) client interfaces.
The V1 client provides callback-based operations following the traditional publish/subscribe pattern.
class IotJobsClient(MqttServiceClient):
"""
V1 client for AWS IoT Jobs service with callback-based operations.
Parameters:
- mqtt_connection: MQTT connection (mqtt.Connection or mqtt5.Client)
"""
def __init__(self, mqtt_connection): ...Usage example:
from awsiot import mqtt_connection_builder, iotjobs
# Create MQTT connection
connection = mqtt_connection_builder.mtls_from_path(
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
cert_filepath="/path/to/certificate.pem.crt",
pri_key_filepath="/path/to/private.pem.key",
client_id="jobs-client-123"
)
# Create jobs client
jobs_client = iotjobs.IotJobsClient(connection)def publish_get_pending_job_executions(self, request, qos):
"""
Publish request to get list of pending job executions.
Parameters:
- request (GetPendingJobExecutionsRequest): Request for pending jobs
- qos (awscrt.mqtt.QoS): Quality of service
Returns:
Future: Future that completes when request is published
"""
def subscribe_to_get_pending_job_executions_accepted(self, request, qos, callback):
"""
Subscribe to successful get pending jobs responses.
Parameters:
- request (GetPendingJobExecutionsSubscriptionRequest): Subscription request
- qos (awscrt.mqtt.QoS): Quality of service
- callback: Function called with pending jobs response
Returns:
Tuple[Future, str]: Future for subscription, topic string
"""
def subscribe_to_get_pending_job_executions_rejected(self, request, qos, callback):
"""
Subscribe to rejected get pending jobs responses.
Parameters:
- request (GetPendingJobExecutionsSubscriptionRequest): Subscription request
- qos (awscrt.mqtt.QoS): Quality of service
- callback: Function called when request is rejected
Returns:
Tuple[Future, str]: Future for subscription, topic string
"""def publish_start_next_pending_job_execution(self, request, qos):
"""
Publish request to start the next pending job execution.
Parameters:
- request (StartNextPendingJobExecutionRequest): Start job request
- qos (awscrt.mqtt.QoS): Quality of service
Returns:
Future: Future that completes when request is published
"""
def subscribe_to_start_next_pending_job_execution_accepted(self, request, qos, callback):
"""
Subscribe to successful start next job responses.
Parameters:
- request (StartNextPendingJobExecutionSubscriptionRequest): Subscription request
- qos (awscrt.mqtt.QoS): Quality of service
- callback: Function called when job start succeeds
Returns:
Tuple[Future, str]: Future for subscription, topic string
"""
def subscribe_to_start_next_pending_job_execution_rejected(self, request, qos, callback):
"""
Subscribe to rejected start next job responses.
"""def publish_describe_job_execution(self, request, qos):
"""
Publish request to describe a specific job execution.
Parameters:
- request (DescribeJobExecutionRequest): Describe job request
- qos (awscrt.mqtt.QoS): Quality of service
Returns:
Future: Future that completes when request is published
"""
def subscribe_to_describe_job_execution_accepted(self, request, qos, callback):
"""
Subscribe to successful describe job responses.
"""
def subscribe_to_describe_job_execution_rejected(self, request, qos, callback):
"""
Subscribe to rejected describe job responses.
"""def publish_update_job_execution(self, request, qos):
"""
Publish request to update job execution status.
Parameters:
- request (UpdateJobExecutionRequest): Update job request
- qos (awscrt.mqtt.QoS): Quality of service
Returns:
Future: Future that completes when request is published
"""
def subscribe_to_update_job_execution_accepted(self, request, qos, callback):
"""
Subscribe to successful update job responses.
"""
def subscribe_to_update_job_execution_rejected(self, request, qos, callback):
"""
Subscribe to rejected update job responses.
"""def subscribe_to_job_executions_changed_events(self, request, qos, callback):
"""
Subscribe to job executions changed events.
Parameters:
- request (JobExecutionsChangedSubscriptionRequest): Subscription request
- qos (awscrt.mqtt.QoS): Quality of service
- callback: Function called when job executions change
Returns:
Tuple[Future, str]: Future for subscription, topic string
"""def subscribe_to_next_job_execution_changed_events(self, request, qos, callback):
"""
Subscribe to next job execution changed events.
Parameters:
- request (NextJobExecutionChangedSubscriptionRequest): Subscription request
- qos (awscrt.mqtt.QoS): Quality of service
- callback: Function called when next job execution changes
Returns:
Tuple[Future, str]: Future for subscription, topic string
"""The V2 client provides Future-based operations with request-response semantics.
class IotJobsClientV2:
"""
V2 client for AWS IoT Jobs service with Future-based operations.
"""
def __init__(self, connection): ...def get_pending_job_executions(self, request):
"""
Get list of pending job executions using request-response pattern.
Parameters:
- request (GetPendingJobExecutionsRequest): Request for pending jobs
Returns:
Future[GetPendingJobExecutionsResponse]: Future containing pending jobs response
"""
def start_next_pending_job_execution(self, request):
"""
Start the next pending job execution using request-response pattern.
Parameters:
- request (StartNextPendingJobExecutionRequest): Start job request
Returns:
Future[StartNextJobExecutionResponse]: Future containing job start response
"""
def describe_job_execution(self, request):
"""
Describe a specific job execution using request-response pattern.
Parameters:
- request (DescribeJobExecutionRequest): Describe job request
Returns:
Future[DescribeJobExecutionResponse]: Future containing job description
"""
def update_job_execution(self, request):
"""
Update job execution status using request-response pattern.
Parameters:
- request (UpdateJobExecutionRequest): Update job request
Returns:
Future[UpdateJobExecutionResponse]: Future containing update response
"""def create_job_executions_changed_stream(self, request, options):
"""
Create streaming operation for job executions changed events.
Parameters:
- request (JobExecutionsChangedSubscriptionRequest): Stream request
- options (ServiceStreamOptions): Stream configuration
Returns:
StreamingOperation: Streaming operation handle
"""
def create_next_job_execution_changed_stream(self, request, options):
"""
Create streaming operation for next job execution changed events.
Parameters:
- request (NextJobExecutionChangedSubscriptionRequest): Stream request
- options (ServiceStreamOptions): Stream configuration
Returns:
StreamingOperation: Streaming operation handle
"""@dataclass
class GetPendingJobExecutionsRequest:
"""Request to get pending job executions."""
thing_name: str
@dataclass
class StartNextPendingJobExecutionRequest:
"""Request to start next pending job execution."""
thing_name: str
step_timeout_in_minutes: Optional[int] = None
@dataclass
class DescribeJobExecutionRequest:
"""Request to describe a job execution."""
thing_name: str
job_id: str
execution_number: Optional[int] = None
include_job_document: Optional[bool] = None
@dataclass
class UpdateJobExecutionRequest:
"""Request to update job execution status."""
thing_name: str
job_id: str
status: str # JobStatus constant
status_details: Optional[Dict[str, str]] = None
step_timeout_in_minutes: Optional[int] = None
expected_version: Optional[int] = None
execution_number: Optional[int] = None
include_job_execution_state: Optional[bool] = None
include_job_document: Optional[bool] = None@dataclass
class GetPendingJobExecutionsResponse:
"""Response from get pending job executions."""
in_progress_jobs: Optional[List[JobExecutionSummary]] = None
queued_jobs: Optional[List[JobExecutionSummary]] = None
timestamp: Optional[datetime.datetime] = None
client_token: Optional[str] = None
@dataclass
class StartNextJobExecutionResponse:
"""Response from start next job execution."""
execution: Optional[JobExecutionData] = None
timestamp: Optional[datetime.datetime] = None
client_token: Optional[str] = None
@dataclass
class DescribeJobExecutionResponse:
"""Response from describe job execution."""
execution: Optional[JobExecutionData] = None
timestamp: Optional[datetime.datetime] = None
client_token: Optional[str] = None
@dataclass
class UpdateJobExecutionResponse:
"""Response from update job execution."""
execution_state: Optional[JobExecutionState] = None
job_document: Optional[Dict[str, Any]] = None
timestamp: Optional[datetime.datetime] = None
client_token: Optional[str] = None@dataclass
class JobExecutionData:
"""Complete job execution data."""
job_id: Optional[str] = None
thing_name: Optional[str] = None
job_document: Optional[Dict[str, Any]] = None
status: Optional[str] = None
status_details: Optional[Dict[str, str]] = None
queued_at: Optional[datetime.datetime] = None
started_at: Optional[datetime.datetime] = None
last_updated_at: Optional[datetime.datetime] = None
version_number: Optional[int] = None
execution_number: Optional[int] = None
@dataclass
class JobExecutionState:
"""Job execution state information."""
status: Optional[str] = None
status_details: Optional[Dict[str, str]] = None
version_number: Optional[int] = None
@dataclass
class JobExecutionSummary:
"""Summary of job execution."""
job_id: Optional[str] = None
thing_name: Optional[str] = None
version_number: Optional[int] = None
execution_number: Optional[int] = None
queued_at: Optional[datetime.datetime] = None
started_at: Optional[datetime.datetime] = None
last_updated_at: Optional[datetime.datetime] = None@dataclass
class JobExecutionsChangedEvent:
"""Event when job executions change."""
jobs: Optional[Dict[str, List[JobExecutionSummary]]] = None
timestamp: Optional[datetime.datetime] = None
@dataclass
class NextJobExecutionChangedEvent:
"""Event when next job execution changes."""
execution: Optional[JobExecutionData] = None
timestamp: Optional[datetime.datetime] = None@dataclass
class GetPendingJobExecutionsSubscriptionRequest:
"""Subscription request for get pending jobs responses."""
thing_name: str
@dataclass
class JobExecutionsChangedSubscriptionRequest:
"""Subscription request for job executions changed events."""
thing_name: str
@dataclass
class NextJobExecutionChangedSubscriptionRequest:
"""Subscription request for next job execution changed events."""
thing_name: str@dataclass
class RejectedError:
"""Error response from jobs operations."""
code: Optional[str] = None # RejectedErrorCode constant
message: Optional[str] = None
timestamp: Optional[datetime.datetime] = None
client_token: Optional[str] = Noneclass JobStatus:
"""Job execution status constants."""
CANCELED = "CANCELED"
FAILED = "FAILED"
QUEUED = "QUEUED"
IN_PROGRESS = "IN_PROGRESS"
SUCCEEDED = "SUCCEEDED"
TIMED_OUT = "TIMED_OUT"
REJECTED = "REJECTED"
REMOVED = "REMOVED"class RejectedErrorCode:
"""Error code constants for rejected operations."""
INTERNAL_ERROR = "InternalError"
INVALID_JSON = "InvalidJson"
INVALID_REQUEST = "InvalidRequest"
INVALID_STATE_TRANSITION = "InvalidStateTransition"
RESOURCE_NOT_FOUND = "ResourceNotFound"
VERSION_MISMATCH = "VersionMismatch"
INVALID_TOPIC = "InvalidTopic"
REQUEST_THROTTLED = "RequestThrottled"
TERMINAL_STATE_REACHED = "TerminalStateReached"from awsiot import mqtt_connection_builder, iotjobs
from awscrt import mqtt
import json
import time
# Create connection and client
connection = mqtt_connection_builder.mtls_from_path(
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
cert_filepath="/path/to/certificate.pem.crt",
pri_key_filepath="/path/to/private.pem.key",
client_id="job-processor-123"
)
jobs_client = iotjobs.IotJobsClient(connection)
connection.connect().result()
# Track current job
current_job = None
def on_start_job_accepted(response):
"""Handle successful job start."""
global current_job
if response.execution:
current_job = response.execution
print(f"Started job: {current_job.job_id}")
print(f"Job document: {current_job.job_document}")
# Process the job (example: firmware update)
success = process_job(current_job)
# Update job status
status = iotjobs.JobStatus.SUCCEEDED if success else iotjobs.JobStatus.FAILED
update_request = iotjobs.UpdateJobExecutionRequest(
thing_name="MyDevice",
job_id=current_job.job_id,
status=status,
status_details={"result": "Job completed successfully" if success else "Job failed"}
)
jobs_client.publish_update_job_execution(update_request, mqtt.QoS.AT_LEAST_ONCE)
def on_start_job_rejected(error):
"""Handle job start rejection."""
print(f"Job start rejected: {error}")
def on_update_job_accepted(response):
"""Handle successful job update."""
print(f"Job update accepted: {response}")
global current_job
current_job = None
def on_update_job_rejected(error):
"""Handle job update rejection."""
print(f"Job update rejected: {error}")
def on_next_job_changed(event):
"""Handle next job execution changed events."""
if event.execution and not current_job:
print("New job available, starting...")
start_request = iotjobs.StartNextPendingJobExecutionRequest(
thing_name="MyDevice",
step_timeout_in_minutes=10
)
jobs_client.publish_start_next_pending_job_execution(start_request, mqtt.QoS.AT_LEAST_ONCE)
def process_job(job_execution):
"""Process the job - implement your job logic here."""
print(f"Processing job: {job_execution.job_id}")
# Example job processing based on job document
job_doc = job_execution.job_document
if job_doc.get("operation") == "firmware_update":
firmware_url = job_doc.get("firmware_url")
version = job_doc.get("version")
print(f"Updating firmware to version {version} from {firmware_url}")
# Simulate firmware update
time.sleep(5)
return True
elif job_doc.get("operation") == "config_update":
config = job_doc.get("config")
print(f"Updating configuration: {config}")
# Simulate config update
time.sleep(2)
return True
return False
# Subscribe to job responses
jobs_client.subscribe_to_start_next_pending_job_execution_accepted(
iotjobs.StartNextPendingJobExecutionSubscriptionRequest(thing_name="MyDevice"),
mqtt.QoS.AT_LEAST_ONCE,
on_start_job_accepted
).result()
jobs_client.subscribe_to_start_next_pending_job_execution_rejected(
iotjobs.StartNextPendingJobExecutionSubscriptionRequest(thing_name="MyDevice"),
mqtt.QoS.AT_LEAST_ONCE,
on_start_job_rejected
).result()
jobs_client.subscribe_to_update_job_execution_accepted(
iotjobs.UpdateJobExecutionSubscriptionRequest(thing_name="MyDevice"),
mqtt.QoS.AT_LEAST_ONCE,
on_update_job_accepted
).result()
jobs_client.subscribe_to_update_job_execution_rejected(
iotjobs.UpdateJobExecutionSubscriptionRequest(thing_name="MyDevice"),
mqtt.QoS.AT_LEAST_ONCE,
on_update_job_rejected
).result()
# Subscribe to job change events
jobs_client.subscribe_to_next_job_execution_changed_events(
iotjobs.NextJobExecutionChangedSubscriptionRequest(thing_name="MyDevice"),
mqtt.QoS.AT_LEAST_ONCE,
on_next_job_changed
).result()
# Check for initial pending jobs
start_request = iotjobs.StartNextPendingJobExecutionRequest(
thing_name="MyDevice",
step_timeout_in_minutes=10
)
jobs_client.publish_start_next_pending_job_execution(start_request, mqtt.QoS.AT_LEAST_ONCE)
print("Job processor started. Waiting for jobs...")from awsiot import mqtt_connection_builder, iotjobs
import asyncio
async def job_processor():
# Create connection
connection = mqtt_connection_builder.mtls_from_path(
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
cert_filepath="/path/to/certificate.pem.crt",
pri_key_filepath="/path/to/private.pem.key",
client_id="jobs-v2-client"
)
# Create V2 client
jobs_client = iotjobs.IotJobsClientV2(connection)
await connection.connect()
try:
while True:
# Get pending jobs
pending_request = iotjobs.GetPendingJobExecutionsRequest(thing_name="MyDevice")
pending_response = await jobs_client.get_pending_job_executions(pending_request)
if pending_response.queued_jobs:
print(f"Found {len(pending_response.queued_jobs)} pending jobs")
# Start next job
start_request = iotjobs.StartNextPendingJobExecutionRequest(
thing_name="MyDevice",
step_timeout_in_minutes=15
)
start_response = await jobs_client.start_next_pending_job_execution(start_request)
if start_response.execution:
job = start_response.execution
print(f"Started job: {job.job_id}")
# Process job
success = await process_job_async(job)
# Update job status
status = iotjobs.JobStatus.SUCCEEDED if success else iotjobs.JobStatus.FAILED
update_request = iotjobs.UpdateJobExecutionRequest(
thing_name="MyDevice",
job_id=job.job_id,
status=status,
status_details={"timestamp": str(datetime.now())}
)
update_response = await jobs_client.update_job_execution(update_request)
print(f"Job {job.job_id} completed with status: {status}")
else:
print("No pending jobs, waiting...")
await asyncio.sleep(30)
except iotjobs.V2ServiceException as e:
print(f"Jobs operation failed: {e.message}")
if e.modeled_error:
print(f"Error details: {e.modeled_error}")
finally:
await connection.disconnect()
async def process_job_async(job_execution):
"""Async job processing."""
print(f"Processing job: {job_execution.job_id}")
# Simulate async job processing
await asyncio.sleep(3)
return True
# Run async job processor
asyncio.run(job_processor())from awsiot import iotjobs
def on_job_changed(event):
"""Handle job executions changed events."""
print(f"Job executions changed: {event}")
if event.jobs:
for status, job_list in event.jobs.items():
print(f" {status}: {len(job_list)} jobs")
def on_next_job_changed(event):
"""Handle next job execution changed events."""
print(f"Next job changed: {event}")
if event.execution:
print(f" Next job: {event.execution.job_id}")
# Create stream options
stream_options = iotjobs.ServiceStreamOptions(
incoming_event_listener=on_job_changed
)
next_job_stream_options = iotjobs.ServiceStreamOptions(
incoming_event_listener=on_next_job_changed
)
# Create V2 client and streams
jobs_client = iotjobs.IotJobsClientV2(connection)
job_stream = jobs_client.create_job_executions_changed_stream(
iotjobs.JobExecutionsChangedSubscriptionRequest(thing_name="MyDevice"),
stream_options
)
next_job_stream = jobs_client.create_next_job_execution_changed_stream(
iotjobs.NextJobExecutionChangedSubscriptionRequest(thing_name="MyDevice"),
next_job_stream_options
)Install with Tessl CLI
npx tessl i tessl/pypi-awsiotsdk