0
# IoT Jobs Management
1
2
Job execution capabilities for AWS IoT device management including job discovery, execution updates, status reporting, and real-time job notifications with support for both pending and in-progress job handling through V1 (callback-based) and V2 (Future-based) client interfaces.
3
4
## Capabilities
5
6
### V1 Jobs Client (Callback-Based)
7
8
The V1 client provides callback-based operations following the traditional publish/subscribe pattern.
9
10
#### Client Creation
11
12
```python { .api }
13
class IotJobsClient(MqttServiceClient):
14
"""
15
V1 client for AWS IoT Jobs service with callback-based operations.
16
17
Parameters:
18
- mqtt_connection: MQTT connection (mqtt.Connection or mqtt5.Client)
19
"""
20
def __init__(self, mqtt_connection): ...
21
```
22
23
Usage example:
24
25
```python
26
from awsiot import mqtt_connection_builder, iotjobs
27
28
# Create MQTT connection
29
connection = mqtt_connection_builder.mtls_from_path(
30
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
31
cert_filepath="/path/to/certificate.pem.crt",
32
pri_key_filepath="/path/to/private.pem.key",
33
client_id="jobs-client-123"
34
)
35
36
# Create jobs client
37
jobs_client = iotjobs.IotJobsClient(connection)
38
```
39
40
#### Job Discovery and Execution
41
42
##### Get Pending Job Executions
43
44
```python { .api }
45
def publish_get_pending_job_executions(self, request, qos):
46
"""
47
Publish request to get list of pending job executions.
48
49
Parameters:
50
- request (GetPendingJobExecutionsRequest): Request for pending jobs
51
- qos (awscrt.mqtt.QoS): Quality of service
52
53
Returns:
54
Future: Future that completes when request is published
55
"""
56
57
def subscribe_to_get_pending_job_executions_accepted(self, request, qos, callback):
58
"""
59
Subscribe to successful get pending jobs responses.
60
61
Parameters:
62
- request (GetPendingJobExecutionsSubscriptionRequest): Subscription request
63
- qos (awscrt.mqtt.QoS): Quality of service
64
- callback: Function called with pending jobs response
65
66
Returns:
67
Tuple[Future, str]: Future for subscription, topic string
68
"""
69
70
def subscribe_to_get_pending_job_executions_rejected(self, request, qos, callback):
71
"""
72
Subscribe to rejected get pending jobs responses.
73
74
Parameters:
75
- request (GetPendingJobExecutionsSubscriptionRequest): Subscription request
76
- qos (awscrt.mqtt.QoS): Quality of service
77
- callback: Function called when request is rejected
78
79
Returns:
80
Tuple[Future, str]: Future for subscription, topic string
81
"""
82
```
83
84
##### Start Next Pending Job Execution
85
86
```python { .api }
87
def publish_start_next_pending_job_execution(self, request, qos):
88
"""
89
Publish request to start the next pending job execution.
90
91
Parameters:
92
- request (StartNextPendingJobExecutionRequest): Start job request
93
- qos (awscrt.mqtt.QoS): Quality of service
94
95
Returns:
96
Future: Future that completes when request is published
97
"""
98
99
def subscribe_to_start_next_pending_job_execution_accepted(self, request, qos, callback):
100
"""
101
Subscribe to successful start next job responses.
102
103
Parameters:
104
- request (StartNextPendingJobExecutionSubscriptionRequest): Subscription request
105
- qos (awscrt.mqtt.QoS): Quality of service
106
- callback: Function called when job start succeeds
107
108
Returns:
109
Tuple[Future, str]: Future for subscription, topic string
110
"""
111
112
def subscribe_to_start_next_pending_job_execution_rejected(self, request, qos, callback):
113
"""
114
Subscribe to rejected start next job responses.
115
"""
116
```
117
118
##### Describe Job Execution
119
120
```python { .api }
121
def publish_describe_job_execution(self, request, qos):
122
"""
123
Publish request to describe a specific job execution.
124
125
Parameters:
126
- request (DescribeJobExecutionRequest): Describe job request
127
- qos (awscrt.mqtt.QoS): Quality of service
128
129
Returns:
130
Future: Future that completes when request is published
131
"""
132
133
def subscribe_to_describe_job_execution_accepted(self, request, qos, callback):
134
"""
135
Subscribe to successful describe job responses.
136
"""
137
138
def subscribe_to_describe_job_execution_rejected(self, request, qos, callback):
139
"""
140
Subscribe to rejected describe job responses.
141
"""
142
```
143
144
##### Update Job Execution
145
146
```python { .api }
147
def publish_update_job_execution(self, request, qos):
148
"""
149
Publish request to update job execution status.
150
151
Parameters:
152
- request (UpdateJobExecutionRequest): Update job request
153
- qos (awscrt.mqtt.QoS): Quality of service
154
155
Returns:
156
Future: Future that completes when request is published
157
"""
158
159
def subscribe_to_update_job_execution_accepted(self, request, qos, callback):
160
"""
161
Subscribe to successful update job responses.
162
"""
163
164
def subscribe_to_update_job_execution_rejected(self, request, qos, callback):
165
"""
166
Subscribe to rejected update job responses.
167
"""
168
```
169
170
#### Job Event Subscriptions
171
172
##### Job Executions Changed Events
173
174
```python { .api }
175
def subscribe_to_job_executions_changed_events(self, request, qos, callback):
176
"""
177
Subscribe to job executions changed events.
178
179
Parameters:
180
- request (JobExecutionsChangedSubscriptionRequest): Subscription request
181
- qos (awscrt.mqtt.QoS): Quality of service
182
- callback: Function called when job executions change
183
184
Returns:
185
Tuple[Future, str]: Future for subscription, topic string
186
"""
187
```
188
189
##### Next Job Execution Changed Events
190
191
```python { .api }
192
def subscribe_to_next_job_execution_changed_events(self, request, qos, callback):
193
"""
194
Subscribe to next job execution changed events.
195
196
Parameters:
197
- request (NextJobExecutionChangedSubscriptionRequest): Subscription request
198
- qos (awscrt.mqtt.QoS): Quality of service
199
- callback: Function called when next job execution changes
200
201
Returns:
202
Tuple[Future, str]: Future for subscription, topic string
203
"""
204
```
205
206
### V2 Jobs Client (Future-Based)
207
208
The V2 client provides Future-based operations with request-response semantics.
209
210
#### Client Creation
211
212
```python { .api }
213
class IotJobsClientV2:
214
"""
215
V2 client for AWS IoT Jobs service with Future-based operations.
216
"""
217
def __init__(self, connection): ...
218
```
219
220
#### Job Operations
221
222
```python { .api }
223
def get_pending_job_executions(self, request):
224
"""
225
Get list of pending job executions using request-response pattern.
226
227
Parameters:
228
- request (GetPendingJobExecutionsRequest): Request for pending jobs
229
230
Returns:
231
Future[GetPendingJobExecutionsResponse]: Future containing pending jobs response
232
"""
233
234
def start_next_pending_job_execution(self, request):
235
"""
236
Start the next pending job execution using request-response pattern.
237
238
Parameters:
239
- request (StartNextPendingJobExecutionRequest): Start job request
240
241
Returns:
242
Future[StartNextJobExecutionResponse]: Future containing job start response
243
"""
244
245
def describe_job_execution(self, request):
246
"""
247
Describe a specific job execution using request-response pattern.
248
249
Parameters:
250
- request (DescribeJobExecutionRequest): Describe job request
251
252
Returns:
253
Future[DescribeJobExecutionResponse]: Future containing job description
254
"""
255
256
def update_job_execution(self, request):
257
"""
258
Update job execution status using request-response pattern.
259
260
Parameters:
261
- request (UpdateJobExecutionRequest): Update job request
262
263
Returns:
264
Future[UpdateJobExecutionResponse]: Future containing update response
265
"""
266
```
267
268
#### Streaming Operations
269
270
```python { .api }
271
def create_job_executions_changed_stream(self, request, options):
272
"""
273
Create streaming operation for job executions changed events.
274
275
Parameters:
276
- request (JobExecutionsChangedSubscriptionRequest): Stream request
277
- options (ServiceStreamOptions): Stream configuration
278
279
Returns:
280
StreamingOperation: Streaming operation handle
281
"""
282
283
def create_next_job_execution_changed_stream(self, request, options):
284
"""
285
Create streaming operation for next job execution changed events.
286
287
Parameters:
288
- request (NextJobExecutionChangedSubscriptionRequest): Stream request
289
- options (ServiceStreamOptions): Stream configuration
290
291
Returns:
292
StreamingOperation: Streaming operation handle
293
"""
294
```
295
296
### Data Model Classes
297
298
#### Request Classes
299
300
```python { .api }
301
@dataclass
302
class GetPendingJobExecutionsRequest:
303
"""Request to get pending job executions."""
304
thing_name: str
305
306
@dataclass
307
class StartNextPendingJobExecutionRequest:
308
"""Request to start next pending job execution."""
309
thing_name: str
310
step_timeout_in_minutes: Optional[int] = None
311
312
@dataclass
313
class DescribeJobExecutionRequest:
314
"""Request to describe a job execution."""
315
thing_name: str
316
job_id: str
317
execution_number: Optional[int] = None
318
include_job_document: Optional[bool] = None
319
320
@dataclass
321
class UpdateJobExecutionRequest:
322
"""Request to update job execution status."""
323
thing_name: str
324
job_id: str
325
status: str # JobStatus constant
326
status_details: Optional[Dict[str, str]] = None
327
step_timeout_in_minutes: Optional[int] = None
328
expected_version: Optional[int] = None
329
execution_number: Optional[int] = None
330
include_job_execution_state: Optional[bool] = None
331
include_job_document: Optional[bool] = None
332
```
333
334
#### Response Classes
335
336
```python { .api }
337
@dataclass
338
class GetPendingJobExecutionsResponse:
339
"""Response from get pending job executions."""
340
in_progress_jobs: Optional[List[JobExecutionSummary]] = None
341
queued_jobs: Optional[List[JobExecutionSummary]] = None
342
timestamp: Optional[datetime.datetime] = None
343
client_token: Optional[str] = None
344
345
@dataclass
346
class StartNextJobExecutionResponse:
347
"""Response from start next job execution."""
348
execution: Optional[JobExecutionData] = None
349
timestamp: Optional[datetime.datetime] = None
350
client_token: Optional[str] = None
351
352
@dataclass
353
class DescribeJobExecutionResponse:
354
"""Response from describe job execution."""
355
execution: Optional[JobExecutionData] = None
356
timestamp: Optional[datetime.datetime] = None
357
client_token: Optional[str] = None
358
359
@dataclass
360
class UpdateJobExecutionResponse:
361
"""Response from update job execution."""
362
execution_state: Optional[JobExecutionState] = None
363
job_document: Optional[Dict[str, Any]] = None
364
timestamp: Optional[datetime.datetime] = None
365
client_token: Optional[str] = None
366
```
367
368
#### Job Data Classes
369
370
```python { .api }
371
@dataclass
372
class JobExecutionData:
373
"""Complete job execution data."""
374
job_id: Optional[str] = None
375
thing_name: Optional[str] = None
376
job_document: Optional[Dict[str, Any]] = None
377
status: Optional[str] = None
378
status_details: Optional[Dict[str, str]] = None
379
queued_at: Optional[datetime.datetime] = None
380
started_at: Optional[datetime.datetime] = None
381
last_updated_at: Optional[datetime.datetime] = None
382
version_number: Optional[int] = None
383
execution_number: Optional[int] = None
384
385
@dataclass
386
class JobExecutionState:
387
"""Job execution state information."""
388
status: Optional[str] = None
389
status_details: Optional[Dict[str, str]] = None
390
version_number: Optional[int] = None
391
392
@dataclass
393
class JobExecutionSummary:
394
"""Summary of job execution."""
395
job_id: Optional[str] = None
396
thing_name: Optional[str] = None
397
version_number: Optional[int] = None
398
execution_number: Optional[int] = None
399
queued_at: Optional[datetime.datetime] = None
400
started_at: Optional[datetime.datetime] = None
401
last_updated_at: Optional[datetime.datetime] = None
402
```
403
404
#### Event Classes
405
406
```python { .api }
407
@dataclass
408
class JobExecutionsChangedEvent:
409
"""Event when job executions change."""
410
jobs: Optional[Dict[str, List[JobExecutionSummary]]] = None
411
timestamp: Optional[datetime.datetime] = None
412
413
@dataclass
414
class NextJobExecutionChangedEvent:
415
"""Event when next job execution changes."""
416
execution: Optional[JobExecutionData] = None
417
timestamp: Optional[datetime.datetime] = None
418
```
419
420
#### Subscription Request Classes
421
422
```python { .api }
423
@dataclass
424
class GetPendingJobExecutionsSubscriptionRequest:
425
"""Subscription request for get pending jobs responses."""
426
thing_name: str
427
428
@dataclass
429
class JobExecutionsChangedSubscriptionRequest:
430
"""Subscription request for job executions changed events."""
431
thing_name: str
432
433
@dataclass
434
class NextJobExecutionChangedSubscriptionRequest:
435
"""Subscription request for next job execution changed events."""
436
thing_name: str
437
```
438
439
#### Error Classes
440
441
```python { .api }
442
@dataclass
443
class RejectedError:
444
"""Error response from jobs operations."""
445
code: Optional[str] = None # RejectedErrorCode constant
446
message: Optional[str] = None
447
timestamp: Optional[datetime.datetime] = None
448
client_token: Optional[str] = None
449
```
450
451
### Constants
452
453
#### Job Status
454
455
```python { .api }
456
class JobStatus:
457
"""Job execution status constants."""
458
CANCELED = "CANCELED"
459
FAILED = "FAILED"
460
QUEUED = "QUEUED"
461
IN_PROGRESS = "IN_PROGRESS"
462
SUCCEEDED = "SUCCEEDED"
463
TIMED_OUT = "TIMED_OUT"
464
REJECTED = "REJECTED"
465
REMOVED = "REMOVED"
466
```
467
468
#### Rejected Error Codes
469
470
```python { .api }
471
class RejectedErrorCode:
472
"""Error code constants for rejected operations."""
473
INTERNAL_ERROR = "InternalError"
474
INVALID_JSON = "InvalidJson"
475
INVALID_REQUEST = "InvalidRequest"
476
INVALID_STATE_TRANSITION = "InvalidStateTransition"
477
RESOURCE_NOT_FOUND = "ResourceNotFound"
478
VERSION_MISMATCH = "VersionMismatch"
479
INVALID_TOPIC = "InvalidTopic"
480
REQUEST_THROTTLED = "RequestThrottled"
481
TERMINAL_STATE_REACHED = "TerminalStateReached"
482
```
483
484
## Usage Examples
485
486
### V1 Client - Basic Job Processing Loop
487
488
```python
489
from awsiot import mqtt_connection_builder, iotjobs
490
from awscrt import mqtt
491
import json
492
import time
493
494
# Create connection and client
495
connection = mqtt_connection_builder.mtls_from_path(
496
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
497
cert_filepath="/path/to/certificate.pem.crt",
498
pri_key_filepath="/path/to/private.pem.key",
499
client_id="job-processor-123"
500
)
501
502
jobs_client = iotjobs.IotJobsClient(connection)
503
connection.connect().result()
504
505
# Track current job
506
current_job = None
507
508
def on_start_job_accepted(response):
509
"""Handle successful job start."""
510
global current_job
511
if response.execution:
512
current_job = response.execution
513
print(f"Started job: {current_job.job_id}")
514
print(f"Job document: {current_job.job_document}")
515
516
# Process the job (example: firmware update)
517
success = process_job(current_job)
518
519
# Update job status
520
status = iotjobs.JobStatus.SUCCEEDED if success else iotjobs.JobStatus.FAILED
521
update_request = iotjobs.UpdateJobExecutionRequest(
522
thing_name="MyDevice",
523
job_id=current_job.job_id,
524
status=status,
525
status_details={"result": "Job completed successfully" if success else "Job failed"}
526
)
527
jobs_client.publish_update_job_execution(update_request, mqtt.QoS.AT_LEAST_ONCE)
528
529
def on_start_job_rejected(error):
530
"""Handle job start rejection."""
531
print(f"Job start rejected: {error}")
532
533
def on_update_job_accepted(response):
534
"""Handle successful job update."""
535
print(f"Job update accepted: {response}")
536
global current_job
537
current_job = None
538
539
def on_update_job_rejected(error):
540
"""Handle job update rejection."""
541
print(f"Job update rejected: {error}")
542
543
def on_next_job_changed(event):
544
"""Handle next job execution changed events."""
545
if event.execution and not current_job:
546
print("New job available, starting...")
547
start_request = iotjobs.StartNextPendingJobExecutionRequest(
548
thing_name="MyDevice",
549
step_timeout_in_minutes=10
550
)
551
jobs_client.publish_start_next_pending_job_execution(start_request, mqtt.QoS.AT_LEAST_ONCE)
552
553
def process_job(job_execution):
554
"""Process the job - implement your job logic here."""
555
print(f"Processing job: {job_execution.job_id}")
556
557
# Example job processing based on job document
558
job_doc = job_execution.job_document
559
if job_doc.get("operation") == "firmware_update":
560
firmware_url = job_doc.get("firmware_url")
561
version = job_doc.get("version")
562
print(f"Updating firmware to version {version} from {firmware_url}")
563
564
# Simulate firmware update
565
time.sleep(5)
566
return True
567
568
elif job_doc.get("operation") == "config_update":
569
config = job_doc.get("config")
570
print(f"Updating configuration: {config}")
571
572
# Simulate config update
573
time.sleep(2)
574
return True
575
576
return False
577
578
# Subscribe to job responses
579
jobs_client.subscribe_to_start_next_pending_job_execution_accepted(
580
iotjobs.StartNextPendingJobExecutionSubscriptionRequest(thing_name="MyDevice"),
581
mqtt.QoS.AT_LEAST_ONCE,
582
on_start_job_accepted
583
).result()
584
585
jobs_client.subscribe_to_start_next_pending_job_execution_rejected(
586
iotjobs.StartNextPendingJobExecutionSubscriptionRequest(thing_name="MyDevice"),
587
mqtt.QoS.AT_LEAST_ONCE,
588
on_start_job_rejected
589
).result()
590
591
jobs_client.subscribe_to_update_job_execution_accepted(
592
iotjobs.UpdateJobExecutionSubscriptionRequest(thing_name="MyDevice"),
593
mqtt.QoS.AT_LEAST_ONCE,
594
on_update_job_accepted
595
).result()
596
597
jobs_client.subscribe_to_update_job_execution_rejected(
598
iotjobs.UpdateJobExecutionSubscriptionRequest(thing_name="MyDevice"),
599
mqtt.QoS.AT_LEAST_ONCE,
600
on_update_job_rejected
601
).result()
602
603
# Subscribe to job change events
604
jobs_client.subscribe_to_next_job_execution_changed_events(
605
iotjobs.NextJobExecutionChangedSubscriptionRequest(thing_name="MyDevice"),
606
mqtt.QoS.AT_LEAST_ONCE,
607
on_next_job_changed
608
).result()
609
610
# Check for initial pending jobs
611
start_request = iotjobs.StartNextPendingJobExecutionRequest(
612
thing_name="MyDevice",
613
step_timeout_in_minutes=10
614
)
615
jobs_client.publish_start_next_pending_job_execution(start_request, mqtt.QoS.AT_LEAST_ONCE)
616
617
print("Job processor started. Waiting for jobs...")
618
```
619
620
### V2 Client - Request-Response Pattern
621
622
```python
623
from awsiot import mqtt_connection_builder, iotjobs
624
import asyncio
625
626
async def job_processor():
627
# Create connection
628
connection = mqtt_connection_builder.mtls_from_path(
629
endpoint="your-endpoint.iot.us-east-1.amazonaws.com",
630
cert_filepath="/path/to/certificate.pem.crt",
631
pri_key_filepath="/path/to/private.pem.key",
632
client_id="jobs-v2-client"
633
)
634
635
# Create V2 client
636
jobs_client = iotjobs.IotJobsClientV2(connection)
637
await connection.connect()
638
639
try:
640
while True:
641
# Get pending jobs
642
pending_request = iotjobs.GetPendingJobExecutionsRequest(thing_name="MyDevice")
643
pending_response = await jobs_client.get_pending_job_executions(pending_request)
644
645
if pending_response.queued_jobs:
646
print(f"Found {len(pending_response.queued_jobs)} pending jobs")
647
648
# Start next job
649
start_request = iotjobs.StartNextPendingJobExecutionRequest(
650
thing_name="MyDevice",
651
step_timeout_in_minutes=15
652
)
653
start_response = await jobs_client.start_next_pending_job_execution(start_request)
654
655
if start_response.execution:
656
job = start_response.execution
657
print(f"Started job: {job.job_id}")
658
659
# Process job
660
success = await process_job_async(job)
661
662
# Update job status
663
status = iotjobs.JobStatus.SUCCEEDED if success else iotjobs.JobStatus.FAILED
664
update_request = iotjobs.UpdateJobExecutionRequest(
665
thing_name="MyDevice",
666
job_id=job.job_id,
667
status=status,
668
status_details={"timestamp": str(datetime.now())}
669
)
670
671
update_response = await jobs_client.update_job_execution(update_request)
672
print(f"Job {job.job_id} completed with status: {status}")
673
else:
674
print("No pending jobs, waiting...")
675
await asyncio.sleep(30)
676
677
except iotjobs.V2ServiceException as e:
678
print(f"Jobs operation failed: {e.message}")
679
if e.modeled_error:
680
print(f"Error details: {e.modeled_error}")
681
682
finally:
683
await connection.disconnect()
684
685
async def process_job_async(job_execution):
686
"""Async job processing."""
687
print(f"Processing job: {job_execution.job_id}")
688
689
# Simulate async job processing
690
await asyncio.sleep(3)
691
692
return True
693
694
# Run async job processor
695
asyncio.run(job_processor())
696
```
697
698
### Job Monitoring with Event Streams
699
700
```python
701
from awsiot import iotjobs
702
703
def on_job_changed(event):
704
"""Handle job executions changed events."""
705
print(f"Job executions changed: {event}")
706
if event.jobs:
707
for status, job_list in event.jobs.items():
708
print(f" {status}: {len(job_list)} jobs")
709
710
def on_next_job_changed(event):
711
"""Handle next job execution changed events."""
712
print(f"Next job changed: {event}")
713
if event.execution:
714
print(f" Next job: {event.execution.job_id}")
715
716
# Create stream options
717
stream_options = iotjobs.ServiceStreamOptions(
718
incoming_event_listener=on_job_changed
719
)
720
721
next_job_stream_options = iotjobs.ServiceStreamOptions(
722
incoming_event_listener=on_next_job_changed
723
)
724
725
# Create V2 client and streams
726
jobs_client = iotjobs.IotJobsClientV2(connection)
727
728
job_stream = jobs_client.create_job_executions_changed_stream(
729
iotjobs.JobExecutionsChangedSubscriptionRequest(thing_name="MyDevice"),
730
stream_options
731
)
732
733
next_job_stream = jobs_client.create_next_job_execution_changed_stream(
734
iotjobs.NextJobExecutionChangedSubscriptionRequest(thing_name="MyDevice"),
735
next_job_stream_options
736
)
737
```