or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

device-shadow.mdfleet-provisioning.mdgreengrass-discovery.mdgreengrass-ipc.mdindex.mdiot-jobs.mdmqtt-connections.md

iot-jobs.mddocs/

0

# IoT Jobs Management

1

2

Job execution capabilities for AWS IoT device management including job discovery, execution updates, status reporting, and real-time job notifications with support for both pending and in-progress job handling through V1 (callback-based) and V2 (Future-based) client interfaces.

3

4

## Capabilities

5

6

### V1 Jobs Client (Callback-Based)

7

8

The V1 client provides callback-based operations following the traditional publish/subscribe pattern.

9

10

#### Client Creation

11

12

```python { .api }

13

class IotJobsClient(MqttServiceClient):

14

"""

15

V1 client for AWS IoT Jobs service with callback-based operations.

16

17

Parameters:

18

- mqtt_connection: MQTT connection (mqtt.Connection or mqtt5.Client)

19

"""

20

def __init__(self, mqtt_connection): ...

21

```

22

23

Usage example:

24

25

```python

26

from awsiot import mqtt_connection_builder, iotjobs

27

28

# Create MQTT connection

29

connection = mqtt_connection_builder.mtls_from_path(

30

endpoint="your-endpoint.iot.us-east-1.amazonaws.com",

31

cert_filepath="/path/to/certificate.pem.crt",

32

pri_key_filepath="/path/to/private.pem.key",

33

client_id="jobs-client-123"

34

)

35

36

# Create jobs client

37

jobs_client = iotjobs.IotJobsClient(connection)

38

```

39

40

#### Job Discovery and Execution

41

42

##### Get Pending Job Executions

43

44

```python { .api }

45

def publish_get_pending_job_executions(self, request, qos):

46

"""

47

Publish request to get list of pending job executions.

48

49

Parameters:

50

- request (GetPendingJobExecutionsRequest): Request for pending jobs

51

- qos (awscrt.mqtt.QoS): Quality of service

52

53

Returns:

54

Future: Future that completes when request is published

55

"""

56

57

def subscribe_to_get_pending_job_executions_accepted(self, request, qos, callback):

58

"""

59

Subscribe to successful get pending jobs responses.

60

61

Parameters:

62

- request (GetPendingJobExecutionsSubscriptionRequest): Subscription request

63

- qos (awscrt.mqtt.QoS): Quality of service

64

- callback: Function called with pending jobs response

65

66

Returns:

67

Tuple[Future, str]: Future for subscription, topic string

68

"""

69

70

def subscribe_to_get_pending_job_executions_rejected(self, request, qos, callback):

71

"""

72

Subscribe to rejected get pending jobs responses.

73

74

Parameters:

75

- request (GetPendingJobExecutionsSubscriptionRequest): Subscription request

76

- qos (awscrt.mqtt.QoS): Quality of service

77

- callback: Function called when request is rejected

78

79

Returns:

80

Tuple[Future, str]: Future for subscription, topic string

81

"""

82

```

83

84

##### Start Next Pending Job Execution

85

86

```python { .api }

87

def publish_start_next_pending_job_execution(self, request, qos):

88

"""

89

Publish request to start the next pending job execution.

90

91

Parameters:

92

- request (StartNextPendingJobExecutionRequest): Start job request

93

- qos (awscrt.mqtt.QoS): Quality of service

94

95

Returns:

96

Future: Future that completes when request is published

97

"""

98

99

def subscribe_to_start_next_pending_job_execution_accepted(self, request, qos, callback):

100

"""

101

Subscribe to successful start next job responses.

102

103

Parameters:

104

- request (StartNextPendingJobExecutionSubscriptionRequest): Subscription request

105

- qos (awscrt.mqtt.QoS): Quality of service

106

- callback: Function called when job start succeeds

107

108

Returns:

109

Tuple[Future, str]: Future for subscription, topic string

110

"""

111

112

def subscribe_to_start_next_pending_job_execution_rejected(self, request, qos, callback):

113

"""

114

Subscribe to rejected start next job responses.

115

"""

116

```

117

118

##### Describe Job Execution

119

120

```python { .api }

121

def publish_describe_job_execution(self, request, qos):

122

"""

123

Publish request to describe a specific job execution.

124

125

Parameters:

126

- request (DescribeJobExecutionRequest): Describe job request

127

- qos (awscrt.mqtt.QoS): Quality of service

128

129

Returns:

130

Future: Future that completes when request is published

131

"""

132

133

def subscribe_to_describe_job_execution_accepted(self, request, qos, callback):

134

"""

135

Subscribe to successful describe job responses.

136

"""

137

138

def subscribe_to_describe_job_execution_rejected(self, request, qos, callback):

139

"""

140

Subscribe to rejected describe job responses.

141

"""

142

```

143

144

##### Update Job Execution

145

146

```python { .api }

147

def publish_update_job_execution(self, request, qos):

148

"""

149

Publish request to update job execution status.

150

151

Parameters:

152

- request (UpdateJobExecutionRequest): Update job request

153

- qos (awscrt.mqtt.QoS): Quality of service

154

155

Returns:

156

Future: Future that completes when request is published

157

"""

158

159

def subscribe_to_update_job_execution_accepted(self, request, qos, callback):

160

"""

161

Subscribe to successful update job responses.

162

"""

163

164

def subscribe_to_update_job_execution_rejected(self, request, qos, callback):

165

"""

166

Subscribe to rejected update job responses.

167

"""

168

```

169

170

#### Job Event Subscriptions

171

172

##### Job Executions Changed Events

173

174

```python { .api }

175

def subscribe_to_job_executions_changed_events(self, request, qos, callback):

176

"""

177

Subscribe to job executions changed events.

178

179

Parameters:

180

- request (JobExecutionsChangedSubscriptionRequest): Subscription request

181

- qos (awscrt.mqtt.QoS): Quality of service

182

- callback: Function called when job executions change

183

184

Returns:

185

Tuple[Future, str]: Future for subscription, topic string

186

"""

187

```

188

189

##### Next Job Execution Changed Events

190

191

```python { .api }

192

def subscribe_to_next_job_execution_changed_events(self, request, qos, callback):

193

"""

194

Subscribe to next job execution changed events.

195

196

Parameters:

197

- request (NextJobExecutionChangedSubscriptionRequest): Subscription request

198

- qos (awscrt.mqtt.QoS): Quality of service

199

- callback: Function called when next job execution changes

200

201

Returns:

202

Tuple[Future, str]: Future for subscription, topic string

203

"""

204

```

205

206

### V2 Jobs Client (Future-Based)

207

208

The V2 client provides Future-based operations with request-response semantics.

209

210

#### Client Creation

211

212

```python { .api }

213

class IotJobsClientV2:

214

"""

215

V2 client for AWS IoT Jobs service with Future-based operations.

216

"""

217

def __init__(self, connection): ...

218

```

219

220

#### Job Operations

221

222

```python { .api }

223

def get_pending_job_executions(self, request):

224

"""

225

Get list of pending job executions using request-response pattern.

226

227

Parameters:

228

- request (GetPendingJobExecutionsRequest): Request for pending jobs

229

230

Returns:

231

Future[GetPendingJobExecutionsResponse]: Future containing pending jobs response

232

"""

233

234

def start_next_pending_job_execution(self, request):

235

"""

236

Start the next pending job execution using request-response pattern.

237

238

Parameters:

239

- request (StartNextPendingJobExecutionRequest): Start job request

240

241

Returns:

242

Future[StartNextJobExecutionResponse]: Future containing job start response

243

"""

244

245

def describe_job_execution(self, request):

246

"""

247

Describe a specific job execution using request-response pattern.

248

249

Parameters:

250

- request (DescribeJobExecutionRequest): Describe job request

251

252

Returns:

253

Future[DescribeJobExecutionResponse]: Future containing job description

254

"""

255

256

def update_job_execution(self, request):

257

"""

258

Update job execution status using request-response pattern.

259

260

Parameters:

261

- request (UpdateJobExecutionRequest): Update job request

262

263

Returns:

264

Future[UpdateJobExecutionResponse]: Future containing update response

265

"""

266

```

267

268

#### Streaming Operations

269

270

```python { .api }

271

def create_job_executions_changed_stream(self, request, options):

272

"""

273

Create streaming operation for job executions changed events.

274

275

Parameters:

276

- request (JobExecutionsChangedSubscriptionRequest): Stream request

277

- options (ServiceStreamOptions): Stream configuration

278

279

Returns:

280

StreamingOperation: Streaming operation handle

281

"""

282

283

def create_next_job_execution_changed_stream(self, request, options):

284

"""

285

Create streaming operation for next job execution changed events.

286

287

Parameters:

288

- request (NextJobExecutionChangedSubscriptionRequest): Stream request

289

- options (ServiceStreamOptions): Stream configuration

290

291

Returns:

292

StreamingOperation: Streaming operation handle

293

"""

294

```

295

296

### Data Model Classes

297

298

#### Request Classes

299

300

```python { .api }

301

@dataclass

302

class GetPendingJobExecutionsRequest:

303

"""Request to get pending job executions."""

304

thing_name: str

305

306

@dataclass

307

class StartNextPendingJobExecutionRequest:

308

"""Request to start next pending job execution."""

309

thing_name: str

310

step_timeout_in_minutes: Optional[int] = None

311

312

@dataclass

313

class DescribeJobExecutionRequest:

314

"""Request to describe a job execution."""

315

thing_name: str

316

job_id: str

317

execution_number: Optional[int] = None

318

include_job_document: Optional[bool] = None

319

320

@dataclass

321

class UpdateJobExecutionRequest:

322

"""Request to update job execution status."""

323

thing_name: str

324

job_id: str

325

status: str # JobStatus constant

326

status_details: Optional[Dict[str, str]] = None

327

step_timeout_in_minutes: Optional[int] = None

328

expected_version: Optional[int] = None

329

execution_number: Optional[int] = None

330

include_job_execution_state: Optional[bool] = None

331

include_job_document: Optional[bool] = None

332

```

333

334

#### Response Classes

335

336

```python { .api }

337

@dataclass

338

class GetPendingJobExecutionsResponse:

339

"""Response from get pending job executions."""

340

in_progress_jobs: Optional[List[JobExecutionSummary]] = None

341

queued_jobs: Optional[List[JobExecutionSummary]] = None

342

timestamp: Optional[datetime.datetime] = None

343

client_token: Optional[str] = None

344

345

@dataclass

346

class StartNextJobExecutionResponse:

347

"""Response from start next job execution."""

348

execution: Optional[JobExecutionData] = None

349

timestamp: Optional[datetime.datetime] = None

350

client_token: Optional[str] = None

351

352

@dataclass

353

class DescribeJobExecutionResponse:

354

"""Response from describe job execution."""

355

execution: Optional[JobExecutionData] = None

356

timestamp: Optional[datetime.datetime] = None

357

client_token: Optional[str] = None

358

359

@dataclass

360

class UpdateJobExecutionResponse:

361

"""Response from update job execution."""

362

execution_state: Optional[JobExecutionState] = None

363

job_document: Optional[Dict[str, Any]] = None

364

timestamp: Optional[datetime.datetime] = None

365

client_token: Optional[str] = None

366

```

367

368

#### Job Data Classes

369

370

```python { .api }

371

@dataclass

372

class JobExecutionData:

373

"""Complete job execution data."""

374

job_id: Optional[str] = None

375

thing_name: Optional[str] = None

376

job_document: Optional[Dict[str, Any]] = None

377

status: Optional[str] = None

378

status_details: Optional[Dict[str, str]] = None

379

queued_at: Optional[datetime.datetime] = None

380

started_at: Optional[datetime.datetime] = None

381

last_updated_at: Optional[datetime.datetime] = None

382

version_number: Optional[int] = None

383

execution_number: Optional[int] = None

384

385

@dataclass

386

class JobExecutionState:

387

"""Job execution state information."""

388

status: Optional[str] = None

389

status_details: Optional[Dict[str, str]] = None

390

version_number: Optional[int] = None

391

392

@dataclass

393

class JobExecutionSummary:

394

"""Summary of job execution."""

395

job_id: Optional[str] = None

396

thing_name: Optional[str] = None

397

version_number: Optional[int] = None

398

execution_number: Optional[int] = None

399

queued_at: Optional[datetime.datetime] = None

400

started_at: Optional[datetime.datetime] = None

401

last_updated_at: Optional[datetime.datetime] = None

402

```

403

404

#### Event Classes

405

406

```python { .api }

407

@dataclass

408

class JobExecutionsChangedEvent:

409

"""Event when job executions change."""

410

jobs: Optional[Dict[str, List[JobExecutionSummary]]] = None

411

timestamp: Optional[datetime.datetime] = None

412

413

@dataclass

414

class NextJobExecutionChangedEvent:

415

"""Event when next job execution changes."""

416

execution: Optional[JobExecutionData] = None

417

timestamp: Optional[datetime.datetime] = None

418

```

419

420

#### Subscription Request Classes

421

422

```python { .api }

423

@dataclass

424

class GetPendingJobExecutionsSubscriptionRequest:

425

"""Subscription request for get pending jobs responses."""

426

thing_name: str

427

428

@dataclass

429

class JobExecutionsChangedSubscriptionRequest:

430

"""Subscription request for job executions changed events."""

431

thing_name: str

432

433

@dataclass

434

class NextJobExecutionChangedSubscriptionRequest:

435

"""Subscription request for next job execution changed events."""

436

thing_name: str

437

```

438

439

#### Error Classes

440

441

```python { .api }

442

@dataclass

443

class RejectedError:

444

"""Error response from jobs operations."""

445

code: Optional[str] = None # RejectedErrorCode constant

446

message: Optional[str] = None

447

timestamp: Optional[datetime.datetime] = None

448

client_token: Optional[str] = None

449

```

450

451

### Constants

452

453

#### Job Status

454

455

```python { .api }

456

class JobStatus:

457

"""Job execution status constants."""

458

CANCELED = "CANCELED"

459

FAILED = "FAILED"

460

QUEUED = "QUEUED"

461

IN_PROGRESS = "IN_PROGRESS"

462

SUCCEEDED = "SUCCEEDED"

463

TIMED_OUT = "TIMED_OUT"

464

REJECTED = "REJECTED"

465

REMOVED = "REMOVED"

466

```

467

468

#### Rejected Error Codes

469

470

```python { .api }

471

class RejectedErrorCode:

472

"""Error code constants for rejected operations."""

473

INTERNAL_ERROR = "InternalError"

474

INVALID_JSON = "InvalidJson"

475

INVALID_REQUEST = "InvalidRequest"

476

INVALID_STATE_TRANSITION = "InvalidStateTransition"

477

RESOURCE_NOT_FOUND = "ResourceNotFound"

478

VERSION_MISMATCH = "VersionMismatch"

479

INVALID_TOPIC = "InvalidTopic"

480

REQUEST_THROTTLED = "RequestThrottled"

481

TERMINAL_STATE_REACHED = "TerminalStateReached"

482

```

483

484

## Usage Examples

485

486

### V1 Client - Basic Job Processing Loop

487

488

```python

489

from awsiot import mqtt_connection_builder, iotjobs

490

from awscrt import mqtt

491

import json

492

import time

493

494

# Create connection and client

495

connection = mqtt_connection_builder.mtls_from_path(

496

endpoint="your-endpoint.iot.us-east-1.amazonaws.com",

497

cert_filepath="/path/to/certificate.pem.crt",

498

pri_key_filepath="/path/to/private.pem.key",

499

client_id="job-processor-123"

500

)

501

502

jobs_client = iotjobs.IotJobsClient(connection)

503

connection.connect().result()

504

505

# Track current job

506

current_job = None

507

508

def on_start_job_accepted(response):

509

"""Handle successful job start."""

510

global current_job

511

if response.execution:

512

current_job = response.execution

513

print(f"Started job: {current_job.job_id}")

514

print(f"Job document: {current_job.job_document}")

515

516

# Process the job (example: firmware update)

517

success = process_job(current_job)

518

519

# Update job status

520

status = iotjobs.JobStatus.SUCCEEDED if success else iotjobs.JobStatus.FAILED

521

update_request = iotjobs.UpdateJobExecutionRequest(

522

thing_name="MyDevice",

523

job_id=current_job.job_id,

524

status=status,

525

status_details={"result": "Job completed successfully" if success else "Job failed"}

526

)

527

jobs_client.publish_update_job_execution(update_request, mqtt.QoS.AT_LEAST_ONCE)

528

529

def on_start_job_rejected(error):

530

"""Handle job start rejection."""

531

print(f"Job start rejected: {error}")

532

533

def on_update_job_accepted(response):

534

"""Handle successful job update."""

535

print(f"Job update accepted: {response}")

536

global current_job

537

current_job = None

538

539

def on_update_job_rejected(error):

540

"""Handle job update rejection."""

541

print(f"Job update rejected: {error}")

542

543

def on_next_job_changed(event):

544

"""Handle next job execution changed events."""

545

if event.execution and not current_job:

546

print("New job available, starting...")

547

start_request = iotjobs.StartNextPendingJobExecutionRequest(

548

thing_name="MyDevice",

549

step_timeout_in_minutes=10

550

)

551

jobs_client.publish_start_next_pending_job_execution(start_request, mqtt.QoS.AT_LEAST_ONCE)

552

553

def process_job(job_execution):

554

"""Process the job - implement your job logic here."""

555

print(f"Processing job: {job_execution.job_id}")

556

557

# Example job processing based on job document

558

job_doc = job_execution.job_document

559

if job_doc.get("operation") == "firmware_update":

560

firmware_url = job_doc.get("firmware_url")

561

version = job_doc.get("version")

562

print(f"Updating firmware to version {version} from {firmware_url}")

563

564

# Simulate firmware update

565

time.sleep(5)

566

return True

567

568

elif job_doc.get("operation") == "config_update":

569

config = job_doc.get("config")

570

print(f"Updating configuration: {config}")

571

572

# Simulate config update

573

time.sleep(2)

574

return True

575

576

return False

577

578

# Subscribe to job responses

579

jobs_client.subscribe_to_start_next_pending_job_execution_accepted(

580

iotjobs.StartNextPendingJobExecutionSubscriptionRequest(thing_name="MyDevice"),

581

mqtt.QoS.AT_LEAST_ONCE,

582

on_start_job_accepted

583

).result()

584

585

jobs_client.subscribe_to_start_next_pending_job_execution_rejected(

586

iotjobs.StartNextPendingJobExecutionSubscriptionRequest(thing_name="MyDevice"),

587

mqtt.QoS.AT_LEAST_ONCE,

588

on_start_job_rejected

589

).result()

590

591

jobs_client.subscribe_to_update_job_execution_accepted(

592

iotjobs.UpdateJobExecutionSubscriptionRequest(thing_name="MyDevice"),

593

mqtt.QoS.AT_LEAST_ONCE,

594

on_update_job_accepted

595

).result()

596

597

jobs_client.subscribe_to_update_job_execution_rejected(

598

iotjobs.UpdateJobExecutionSubscriptionRequest(thing_name="MyDevice"),

599

mqtt.QoS.AT_LEAST_ONCE,

600

on_update_job_rejected

601

).result()

602

603

# Subscribe to job change events

604

jobs_client.subscribe_to_next_job_execution_changed_events(

605

iotjobs.NextJobExecutionChangedSubscriptionRequest(thing_name="MyDevice"),

606

mqtt.QoS.AT_LEAST_ONCE,

607

on_next_job_changed

608

).result()

609

610

# Check for initial pending jobs

611

start_request = iotjobs.StartNextPendingJobExecutionRequest(

612

thing_name="MyDevice",

613

step_timeout_in_minutes=10

614

)

615

jobs_client.publish_start_next_pending_job_execution(start_request, mqtt.QoS.AT_LEAST_ONCE)

616

617

print("Job processor started. Waiting for jobs...")

618

```

619

620

### V2 Client - Request-Response Pattern

621

622

```python

623

from awsiot import mqtt_connection_builder, iotjobs

624

import asyncio

625

626

async def job_processor():

627

# Create connection

628

connection = mqtt_connection_builder.mtls_from_path(

629

endpoint="your-endpoint.iot.us-east-1.amazonaws.com",

630

cert_filepath="/path/to/certificate.pem.crt",

631

pri_key_filepath="/path/to/private.pem.key",

632

client_id="jobs-v2-client"

633

)

634

635

# Create V2 client

636

jobs_client = iotjobs.IotJobsClientV2(connection)

637

await connection.connect()

638

639

try:

640

while True:

641

# Get pending jobs

642

pending_request = iotjobs.GetPendingJobExecutionsRequest(thing_name="MyDevice")

643

pending_response = await jobs_client.get_pending_job_executions(pending_request)

644

645

if pending_response.queued_jobs:

646

print(f"Found {len(pending_response.queued_jobs)} pending jobs")

647

648

# Start next job

649

start_request = iotjobs.StartNextPendingJobExecutionRequest(

650

thing_name="MyDevice",

651

step_timeout_in_minutes=15

652

)

653

start_response = await jobs_client.start_next_pending_job_execution(start_request)

654

655

if start_response.execution:

656

job = start_response.execution

657

print(f"Started job: {job.job_id}")

658

659

# Process job

660

success = await process_job_async(job)

661

662

# Update job status

663

status = iotjobs.JobStatus.SUCCEEDED if success else iotjobs.JobStatus.FAILED

664

update_request = iotjobs.UpdateJobExecutionRequest(

665

thing_name="MyDevice",

666

job_id=job.job_id,

667

status=status,

668

status_details={"timestamp": str(datetime.now())}

669

)

670

671

update_response = await jobs_client.update_job_execution(update_request)

672

print(f"Job {job.job_id} completed with status: {status}")

673

else:

674

print("No pending jobs, waiting...")

675

await asyncio.sleep(30)

676

677

except iotjobs.V2ServiceException as e:

678

print(f"Jobs operation failed: {e.message}")

679

if e.modeled_error:

680

print(f"Error details: {e.modeled_error}")

681

682

finally:

683

await connection.disconnect()

684

685

async def process_job_async(job_execution):

686

"""Async job processing."""

687

print(f"Processing job: {job_execution.job_id}")

688

689

# Simulate async job processing

690

await asyncio.sleep(3)

691

692

return True

693

694

# Run async job processor

695

asyncio.run(job_processor())

696

```

697

698

### Job Monitoring with Event Streams

699

700

```python

701

from awsiot import iotjobs

702

703

def on_job_changed(event):

704

"""Handle job executions changed events."""

705

print(f"Job executions changed: {event}")

706

if event.jobs:

707

for status, job_list in event.jobs.items():

708

print(f" {status}: {len(job_list)} jobs")

709

710

def on_next_job_changed(event):

711

"""Handle next job execution changed events."""

712

print(f"Next job changed: {event}")

713

if event.execution:

714

print(f" Next job: {event.execution.job_id}")

715

716

# Create stream options

717

stream_options = iotjobs.ServiceStreamOptions(

718

incoming_event_listener=on_job_changed

719

)

720

721

next_job_stream_options = iotjobs.ServiceStreamOptions(

722

incoming_event_listener=on_next_job_changed

723

)

724

725

# Create V2 client and streams

726

jobs_client = iotjobs.IotJobsClientV2(connection)

727

728

job_stream = jobs_client.create_job_executions_changed_stream(

729

iotjobs.JobExecutionsChangedSubscriptionRequest(thing_name="MyDevice"),

730

stream_options

731

)

732

733

next_job_stream = jobs_client.create_next_job_execution_changed_stream(

734

iotjobs.NextJobExecutionChangedSubscriptionRequest(thing_name="MyDevice"),

735

next_job_stream_options

736

)

737

```