0
# AWS IoT Jobs
1
2
Job execution management for device fleet management, including job subscription, execution state management, progress reporting, and job lifecycle operations. AWS IoT Jobs enable you to define a set of remote operations that can be sent to and executed on one or more devices connected to AWS IoT.
3
4
## Capabilities
5
6
### Jobs Client Creation
7
8
Create a specialized MQTT client for AWS IoT Jobs operations with thing-specific configuration and job management capabilities.
9
10
```python { .api }
11
class AWSIoTMQTTThingJobsClient:
12
def __init__(self, clientID: str, thingName: str, QoS: int = 0, protocolType: int = MQTTv3_1_1, useWebsocket: bool = False, cleanSession: bool = True, awsIoTMQTTClient = None):
13
"""
14
Create AWS IoT MQTT Jobs client for thing-specific job operations.
15
16
Args:
17
clientID (str): Client identifier for MQTT connection and job tokens
18
thingName (str): AWS IoT thing name for job topic routing
19
QoS (int): Default QoS level for all job operations (0 or 1)
20
protocolType (int): MQTT version (MQTTv3_1=3, MQTTv3_1_1=4)
21
useWebsocket (bool): Enable MQTT over WebSocket SigV4
22
cleanSession (bool): Start with clean session state
23
awsIoTMQTTClient (AWSIoTMQTTClient): Existing MQTT client to reuse (optional)
24
"""
25
```
26
27
### Job Subscriptions
28
29
Subscribe to various job-related topics for monitoring job state changes and receiving job notifications.
30
31
```python { .api }
32
def createJobSubscription(self, callback: callable, jobExecutionType = None, jobReplyType = None, jobId: str = None) -> bool:
33
"""
34
Subscribe to job-related topic synchronously.
35
36
Args:
37
callback (callable): Message callback (client, userdata, message) -> None
38
jobExecutionType: Topic type from jobExecutionTopicType enum
39
jobReplyType: Reply type from jobExecutionTopicReplyType enum
40
jobId (str): Specific job ID or None for wildcard
41
42
Returns:
43
bool: True if subscription successful, False otherwise
44
"""
45
46
def createJobSubscriptionAsync(self, ackCallback: callable, callback: callable, jobExecutionType = None, jobReplyType = None, jobId: str = None) -> int:
47
"""
48
Subscribe to job-related topic asynchronously.
49
50
Args:
51
ackCallback (callable): SUBACK callback (mid, data) -> None
52
callback (callable): Message callback (client, userdata, message) -> None
53
jobExecutionType: Topic type from jobExecutionTopicType enum
54
jobReplyType: Reply type from jobExecutionTopicReplyType enum
55
jobId (str): Specific job ID or None for wildcard
56
57
Returns:
58
int: Packet ID for tracking in callback
59
"""
60
```
61
62
### Job Queries
63
64
Query job information including pending jobs list and specific job descriptions.
65
66
```python { .api }
67
def sendJobsQuery(self, jobExecTopicType, jobId: str = None) -> bool:
68
"""
69
Send job query request.
70
71
Args:
72
jobExecTopicType: Query type from jobExecutionTopicType enum
73
jobId (str): Job ID for specific queries, or None for general queries
74
75
Returns:
76
bool: True if query sent successfully, False otherwise
77
"""
78
79
def sendJobsDescribe(self, jobId: str, executionNumber: int = 0, includeJobDocument: bool = True) -> bool:
80
"""
81
Request description of specific job execution.
82
83
Args:
84
jobId (str): Job ID to describe (can be '$next' for next pending job)
85
executionNumber (int): Specific execution number (0 for latest)
86
includeJobDocument (bool): Include job document in response
87
88
Returns:
89
bool: True if describe request sent, False otherwise
90
"""
91
```
92
93
### Job Execution Management
94
95
Start, update, and manage job execution lifecycle and status reporting.
96
97
```python { .api }
98
def sendJobsStartNext(self, statusDetails: dict = None, stepTimeoutInMinutes: int = None) -> bool:
99
"""
100
Start next pending job execution.
101
102
Args:
103
statusDetails (dict): Optional status details for job start
104
stepTimeoutInMinutes (int): Timeout for job step execution
105
106
Returns:
107
bool: True if start request sent, False otherwise
108
"""
109
110
def sendJobsUpdate(self, jobId: str, status: int, statusDetails: dict = None, expectedVersion: int = 0, executionNumber: int = 0, includeJobExecutionState: bool = False, includeJobDocument: bool = False, stepTimeoutInMinutes: int = None) -> bool:
111
"""
112
Update job execution status and progress.
113
114
Args:
115
jobId (str): Job ID to update
116
status (int): New job status from jobExecutionStatus enum
117
statusDetails (dict): Optional status details and progress information
118
expectedVersion (int): Expected current version for optimistic locking
119
executionNumber (int): Execution number (0 for latest)
120
includeJobExecutionState (bool): Include execution state in response
121
includeJobDocument (bool): Include job document in response
122
stepTimeoutInMinutes (int): Timeout for next job step
123
124
Returns:
125
bool: True if update sent successfully, False otherwise
126
"""
127
```
128
129
## Job Topic Types and Constants
130
131
```python { .api }
132
# Job execution topic types
133
class jobExecutionTopicType:
134
JOB_UNRECOGNIZED_TOPIC = (0, False, '')
135
JOB_GET_PENDING_TOPIC = (1, False, 'get') # Get list of pending jobs
136
JOB_START_NEXT_TOPIC = (2, False, 'start-next') # Start next pending job
137
JOB_DESCRIBE_TOPIC = (3, True, 'get') # Describe specific job
138
JOB_UPDATE_TOPIC = (4, True, 'update') # Update job status
139
JOB_NOTIFY_TOPIC = (5, False, 'notify') # Job notifications
140
JOB_NOTIFY_NEXT_TOPIC = (6, False, 'notify-next') # Next job notifications
141
JOB_WILDCARD_TOPIC = (7, False, '+') # Wildcard subscription
142
143
# Job execution reply topic types
144
class jobExecutionTopicReplyType:
145
JOB_UNRECOGNIZED_TOPIC_TYPE = (0, '')
146
JOB_REQUEST_TYPE = (1, '') # Request topic (no suffix)
147
JOB_ACCEPTED_REPLY_TYPE = (2, '/accepted') # Accepted response topic
148
JOB_REJECTED_REPLY_TYPE = (3, '/rejected') # Rejected response topic
149
JOB_WILDCARD_REPLY_TYPE = (4, '/#') # Wildcard reply subscription
150
151
# Job execution status values
152
class jobExecutionStatus:
153
JOB_EXECUTION_STATUS_NOT_SET = (0, None)
154
JOB_EXECUTION_QUEUED = (1, 'QUEUED') # Job is queued for execution
155
JOB_EXECUTION_IN_PROGRESS = (2, 'IN_PROGRESS') # Job is currently executing
156
JOB_EXECUTION_FAILED = (3, 'FAILED') # Job execution failed
157
JOB_EXECUTION_SUCCEEDED = (4, 'SUCCEEDED') # Job completed successfully
158
JOB_EXECUTION_CANCELED = (5, 'CANCELED') # Job was canceled
159
JOB_EXECUTION_REJECTED = (6, 'REJECTED') # Job was rejected
160
JOB_EXECUTION_UNKNOWN_STATUS = (99, None) # Unknown status
161
```
162
163
## Usage Examples
164
165
### Basic Job Monitoring
166
167
```python
168
import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
169
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType
170
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType
171
import json
172
173
# Create jobs client
174
jobsClient = AWSIoTPyMQTT.AWSIoTMQTTThingJobsClient("myJobsClient", "myThingName")
175
jobsClient.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
176
jobsClient.configureCredentials("rootCA.crt", "private.key", "certificate.crt")
177
jobsClient.connect()
178
179
# Job notification callback
180
def jobNotificationCallback(client, userdata, message):
181
print(f"Job notification on {message.topic}: {message.payload.decode()}")
182
payload = json.loads(message.payload.decode())
183
184
if "execution" in payload:
185
job_id = payload["execution"]["jobId"]
186
status = payload["execution"]["status"]
187
print(f"Job {job_id} status: {status}")
188
189
# Subscribe to job notifications
190
jobsClient.createJobSubscription(
191
jobNotificationCallback,
192
jobExecutionTopicType.JOB_NOTIFY_TOPIC
193
)
194
195
# Subscribe to next job notifications
196
jobsClient.createJobSubscription(
197
jobNotificationCallback,
198
jobExecutionTopicType.JOB_NOTIFY_NEXT_TOPIC
199
)
200
201
# Query pending jobs
202
jobsClient.sendJobsQuery(jobExecutionTopicType.JOB_GET_PENDING_TOPIC)
203
```
204
205
### Job Execution Workflow
206
207
```python
208
import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
209
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType
210
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType
211
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionStatus
212
import json
213
import time
214
215
# Create jobs client
216
jobsClient = AWSIoTPyMQTT.AWSIoTMQTTThingJobsClient("deviceJobExecutor", "myDevice")
217
jobsClient.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
218
jobsClient.configureCredentials("rootCA.crt", "private.key", "certificate.crt")
219
jobsClient.connect()
220
221
current_job_id = None
222
223
# Job response callback
224
def jobResponseCallback(client, userdata, message):
225
global current_job_id
226
print(f"Job response on {message.topic}: {message.payload.decode()}")
227
228
payload = json.loads(message.payload.decode())
229
230
# Handle start-next response
231
if "execution" in payload and payload["execution"]["jobId"]:
232
current_job_id = payload["execution"]["jobId"]
233
job_document = payload["execution"]["jobDocument"]
234
235
print(f"Started job {current_job_id}")
236
print(f"Job document: {job_document}")
237
238
# Execute job logic here
239
execute_job(current_job_id, job_document)
240
241
def execute_job(job_id, job_document):
242
"""Execute the job and report progress"""
243
try:
244
print(f"Executing job {job_id}...")
245
246
# Update job status to IN_PROGRESS
247
jobsClient.sendJobsUpdate(
248
job_id,
249
jobExecutionStatus.JOB_EXECUTION_IN_PROGRESS[0],
250
{"progress": "starting execution"}
251
)
252
253
# Simulate job execution steps
254
for step in range(1, 4):
255
print(f"Executing step {step}...")
256
time.sleep(2) # Simulate work
257
258
# Report progress
259
jobsClient.sendJobsUpdate(
260
job_id,
261
jobExecutionStatus.JOB_EXECUTION_IN_PROGRESS[0],
262
{"progress": f"completed step {step}/3"}
263
)
264
265
# Job completed successfully
266
jobsClient.sendJobsUpdate(
267
job_id,
268
jobExecutionStatus.JOB_EXECUTION_SUCCEEDED[0],
269
{"result": "job completed successfully"}
270
)
271
272
print(f"Job {job_id} completed successfully")
273
274
except Exception as e:
275
# Job failed
276
jobsClient.sendJobsUpdate(
277
job_id,
278
jobExecutionStatus.JOB_EXECUTION_FAILED[0],
279
{"error": str(e)}
280
)
281
print(f"Job {job_id} failed: {e}")
282
283
# Subscribe to job responses
284
jobsClient.createJobSubscription(
285
jobResponseCallback,
286
jobExecutionTopicType.JOB_START_NEXT_TOPIC,
287
jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE
288
)
289
290
jobsClient.createJobSubscription(
291
jobResponseCallback,
292
jobExecutionTopicType.JOB_UPDATE_TOPIC,
293
jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE
294
)
295
296
# Start next pending job
297
print("Starting next pending job...")
298
jobsClient.sendJobsStartNext({"device": "ready for work"})
299
300
# Keep running to process jobs
301
try:
302
while True:
303
time.sleep(1)
304
except KeyboardInterrupt:
305
jobsClient.disconnect()
306
```
307
308
### Advanced Job Management
309
310
```python
311
import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
312
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType
313
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType
314
from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionStatus
315
import json
316
317
# Create jobs client with higher QoS
318
jobsClient = AWSIoTPyMQTT.AWSIoTMQTTThingJobsClient("advancedJobClient", "myDevice", QoS=1)
319
jobsClient.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
320
jobsClient.configureCredentials("rootCA.crt", "private.key", "certificate.crt")
321
jobsClient.connect()
322
323
# Advanced job callback with detailed handling
324
def advancedJobCallback(client, userdata, message):
325
topic = message.topic
326
payload = json.loads(message.payload.decode())
327
328
print(f"Received job message on {topic}")
329
330
if "/accepted" in topic:
331
if "execution" in payload:
332
job_id = payload["execution"]["jobId"]
333
status = payload["execution"]["status"]
334
version = payload["execution"]["versionNumber"]
335
336
print(f"Job {job_id} accepted - Status: {status}, Version: {version}")
337
338
# Handle job document updates with versioning
339
if "jobDocument" in payload["execution"]:
340
job_doc = payload["execution"]["jobDocument"]
341
process_job_with_version(job_id, job_doc, version)
342
343
elif "/rejected" in topic:
344
error_code = payload.get("code", "Unknown")
345
error_message = payload.get("message", "No details")
346
print(f"Job operation rejected - Code: {error_code}, Message: {error_message}")
347
348
def process_job_with_version(job_id, job_document, expected_version):
349
"""Process job with version control for concurrent updates"""
350
try:
351
# Process job document
352
operation = job_document.get("operation", "unknown")
353
parameters = job_document.get("parameters", {})
354
355
print(f"Processing {operation} with parameters: {parameters}")
356
357
# Update with expected version for optimistic locking
358
jobsClient.sendJobsUpdate(
359
job_id,
360
jobExecutionStatus.JOB_EXECUTION_IN_PROGRESS[0],
361
statusDetails={"operation": operation, "stage": "processing"},
362
expectedVersion=expected_version,
363
includeJobExecutionState=True
364
)
365
366
# Simulate processing time
367
import time
368
time.sleep(3)
369
370
# Complete with final status
371
jobsClient.sendJobsUpdate(
372
job_id,
373
jobExecutionStatus.JOB_EXECUTION_SUCCEEDED[0],
374
statusDetails={"operation": operation, "result": "completed"},
375
expectedVersion=expected_version + 1, # Version incremented by previous update
376
includeJobDocument=False
377
)
378
379
except Exception as e:
380
jobsClient.sendJobsUpdate(
381
job_id,
382
jobExecutionStatus.JOB_EXECUTION_FAILED[0],
383
statusDetails={"error": str(e)},
384
expectedVersion=expected_version
385
)
386
387
# Subscribe to multiple job topics with different callbacks
388
jobsClient.createJobSubscription(
389
advancedJobCallback,
390
jobExecutionTopicType.JOB_START_NEXT_TOPIC,
391
jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE
392
)
393
394
jobsClient.createJobSubscription(
395
advancedJobCallback,
396
jobExecutionTopicType.JOB_UPDATE_TOPIC,
397
jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE
398
)
399
400
jobsClient.createJobSubscription(
401
advancedJobCallback,
402
jobExecutionTopicType.JOB_UPDATE_TOPIC,
403
jobExecutionTopicReplyType.JOB_REJECTED_REPLY_TYPE
404
)
405
406
# Query specific job details
407
specific_job_id = "job-12345"
408
jobsClient.sendJobsDescribe(
409
specific_job_id,
410
executionNumber=0,
411
includeJobDocument=True
412
)
413
414
# Start next job with timeout
415
jobsClient.sendJobsStartNext(
416
statusDetails={"device_info": "ready", "capabilities": ["firmware_update", "config_change"]},
417
stepTimeoutInMinutes=30
418
)
419
```
420
421
## Types
422
423
```python { .api }
424
# Job execution callback signature
425
def jobCallback(client, userdata: dict, message) -> None:
426
"""
427
Job operation callback.
428
429
Args:
430
client: MQTT client instance
431
userdata (dict): User data passed to callback
432
message: MQTT message with .topic and .payload attributes
433
"""
434
435
# Job document structure (example)
436
job_document = {
437
"operation": "firmware_update",
438
"parameters": {
439
"firmware_url": "https://example.com/firmware.bin",
440
"version": "1.2.3",
441
"checksum": "sha256:abc123..."
442
},
443
"timeout": 3600
444
}
445
446
# Job execution response structure (example)
447
job_execution_response = {
448
"execution": {
449
"jobId": "job-12345",
450
"status": "IN_PROGRESS",
451
"statusDetails": {
452
"progress": "50%",
453
"step": "downloading"
454
},
455
"queuedAt": 1609459200,
456
"startedAt": 1609459210,
457
"lastUpdatedAt": 1609459250,
458
"versionNumber": 2,
459
"executionNumber": 1,
460
"jobDocument": {
461
"operation": "firmware_update",
462
"parameters": {...}
463
}
464
},
465
"timestamp": 1609459250,
466
"clientToken": "token-abc-123"
467
}
468
```