or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

device-shadows.mdexception-handling.mdgreengrass-discovery.mdindex.mdiot-jobs.mdmqtt-client.md

iot-jobs.mddocs/

0

# AWS IoT Jobs

1

2

Job execution management for device fleet management, including job subscription, execution state management, progress reporting, and job lifecycle operations. AWS IoT Jobs enable you to define a set of remote operations that can be sent to and executed on one or more devices connected to AWS IoT.

3

4

## Capabilities

5

6

### Jobs Client Creation

7

8

Create a specialized MQTT client for AWS IoT Jobs operations with thing-specific configuration and job management capabilities.

9

10

```python { .api }

11

class AWSIoTMQTTThingJobsClient:

12

def __init__(self, clientID: str, thingName: str, QoS: int = 0, protocolType: int = MQTTv3_1_1, useWebsocket: bool = False, cleanSession: bool = True, awsIoTMQTTClient = None):

13

"""

14

Create AWS IoT MQTT Jobs client for thing-specific job operations.

15

16

Args:

17

clientID (str): Client identifier for MQTT connection and job tokens

18

thingName (str): AWS IoT thing name for job topic routing

19

QoS (int): Default QoS level for all job operations (0 or 1)

20

protocolType (int): MQTT version (MQTTv3_1=3, MQTTv3_1_1=4)

21

useWebsocket (bool): Enable MQTT over WebSocket SigV4

22

cleanSession (bool): Start with clean session state

23

awsIoTMQTTClient (AWSIoTMQTTClient): Existing MQTT client to reuse (optional)

24

"""

25

```

26

27

### Job Subscriptions

28

29

Subscribe to various job-related topics for monitoring job state changes and receiving job notifications.

30

31

```python { .api }

32

def createJobSubscription(self, callback: callable, jobExecutionType = None, jobReplyType = None, jobId: str = None) -> bool:

33

"""

34

Subscribe to job-related topic synchronously.

35

36

Args:

37

callback (callable): Message callback (client, userdata, message) -> None

38

jobExecutionType: Topic type from jobExecutionTopicType enum

39

jobReplyType: Reply type from jobExecutionTopicReplyType enum

40

jobId (str): Specific job ID or None for wildcard

41

42

Returns:

43

bool: True if subscription successful, False otherwise

44

"""

45

46

def createJobSubscriptionAsync(self, ackCallback: callable, callback: callable, jobExecutionType = None, jobReplyType = None, jobId: str = None) -> int:

47

"""

48

Subscribe to job-related topic asynchronously.

49

50

Args:

51

ackCallback (callable): SUBACK callback (mid, data) -> None

52

callback (callable): Message callback (client, userdata, message) -> None

53

jobExecutionType: Topic type from jobExecutionTopicType enum

54

jobReplyType: Reply type from jobExecutionTopicReplyType enum

55

jobId (str): Specific job ID or None for wildcard

56

57

Returns:

58

int: Packet ID for tracking in callback

59

"""

60

```

61

62

### Job Queries

63

64

Query job information including pending jobs list and specific job descriptions.

65

66

```python { .api }

67

def sendJobsQuery(self, jobExecTopicType, jobId: str = None) -> bool:

68

"""

69

Send job query request.

70

71

Args:

72

jobExecTopicType: Query type from jobExecutionTopicType enum

73

jobId (str): Job ID for specific queries, or None for general queries

74

75

Returns:

76

bool: True if query sent successfully, False otherwise

77

"""

78

79

def sendJobsDescribe(self, jobId: str, executionNumber: int = 0, includeJobDocument: bool = True) -> bool:

80

"""

81

Request description of specific job execution.

82

83

Args:

84

jobId (str): Job ID to describe (can be '$next' for next pending job)

85

executionNumber (int): Specific execution number (0 for latest)

86

includeJobDocument (bool): Include job document in response

87

88

Returns:

89

bool: True if describe request sent, False otherwise

90

"""

91

```

92

93

### Job Execution Management

94

95

Start, update, and manage job execution lifecycle and status reporting.

96

97

```python { .api }

98

def sendJobsStartNext(self, statusDetails: dict = None, stepTimeoutInMinutes: int = None) -> bool:

99

"""

100

Start next pending job execution.

101

102

Args:

103

statusDetails (dict): Optional status details for job start

104

stepTimeoutInMinutes (int): Timeout for job step execution

105

106

Returns:

107

bool: True if start request sent, False otherwise

108

"""

109

110

def sendJobsUpdate(self, jobId: str, status: int, statusDetails: dict = None, expectedVersion: int = 0, executionNumber: int = 0, includeJobExecutionState: bool = False, includeJobDocument: bool = False, stepTimeoutInMinutes: int = None) -> bool:

111

"""

112

Update job execution status and progress.

113

114

Args:

115

jobId (str): Job ID to update

116

status (int): New job status from jobExecutionStatus enum

117

statusDetails (dict): Optional status details and progress information

118

expectedVersion (int): Expected current version for optimistic locking

119

executionNumber (int): Execution number (0 for latest)

120

includeJobExecutionState (bool): Include execution state in response

121

includeJobDocument (bool): Include job document in response

122

stepTimeoutInMinutes (int): Timeout for next job step

123

124

Returns:

125

bool: True if update sent successfully, False otherwise

126

"""

127

```

128

129

## Job Topic Types and Constants

130

131

```python { .api }

132

# Job execution topic types

133

class jobExecutionTopicType:

134

JOB_UNRECOGNIZED_TOPIC = (0, False, '')

135

JOB_GET_PENDING_TOPIC = (1, False, 'get') # Get list of pending jobs

136

JOB_START_NEXT_TOPIC = (2, False, 'start-next') # Start next pending job

137

JOB_DESCRIBE_TOPIC = (3, True, 'get') # Describe specific job

138

JOB_UPDATE_TOPIC = (4, True, 'update') # Update job status

139

JOB_NOTIFY_TOPIC = (5, False, 'notify') # Job notifications

140

JOB_NOTIFY_NEXT_TOPIC = (6, False, 'notify-next') # Next job notifications

141

JOB_WILDCARD_TOPIC = (7, False, '+') # Wildcard subscription

142

143

# Job execution reply topic types

144

class jobExecutionTopicReplyType:

145

JOB_UNRECOGNIZED_TOPIC_TYPE = (0, '')

146

JOB_REQUEST_TYPE = (1, '') # Request topic (no suffix)

147

JOB_ACCEPTED_REPLY_TYPE = (2, '/accepted') # Accepted response topic

148

JOB_REJECTED_REPLY_TYPE = (3, '/rejected') # Rejected response topic

149

JOB_WILDCARD_REPLY_TYPE = (4, '/#') # Wildcard reply subscription

150

151

# Job execution status values

152

class jobExecutionStatus:

153

JOB_EXECUTION_STATUS_NOT_SET = (0, None)

154

JOB_EXECUTION_QUEUED = (1, 'QUEUED') # Job is queued for execution

155

JOB_EXECUTION_IN_PROGRESS = (2, 'IN_PROGRESS') # Job is currently executing

156

JOB_EXECUTION_FAILED = (3, 'FAILED') # Job execution failed

157

JOB_EXECUTION_SUCCEEDED = (4, 'SUCCEEDED') # Job completed successfully

158

JOB_EXECUTION_CANCELED = (5, 'CANCELED') # Job was canceled

159

JOB_EXECUTION_REJECTED = (6, 'REJECTED') # Job was rejected

160

JOB_EXECUTION_UNKNOWN_STATUS = (99, None) # Unknown status

161

```

162

163

## Usage Examples

164

165

### Basic Job Monitoring

166

167

```python

168

import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT

169

from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType

170

from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType

171

import json

172

173

# Create jobs client

174

jobsClient = AWSIoTPyMQTT.AWSIoTMQTTThingJobsClient("myJobsClient", "myThingName")

175

jobsClient.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)

176

jobsClient.configureCredentials("rootCA.crt", "private.key", "certificate.crt")

177

jobsClient.connect()

178

179

# Job notification callback

180

def jobNotificationCallback(client, userdata, message):

181

print(f"Job notification on {message.topic}: {message.payload.decode()}")

182

payload = json.loads(message.payload.decode())

183

184

if "execution" in payload:

185

job_id = payload["execution"]["jobId"]

186

status = payload["execution"]["status"]

187

print(f"Job {job_id} status: {status}")

188

189

# Subscribe to job notifications

190

jobsClient.createJobSubscription(

191

jobNotificationCallback,

192

jobExecutionTopicType.JOB_NOTIFY_TOPIC

193

)

194

195

# Subscribe to next job notifications

196

jobsClient.createJobSubscription(

197

jobNotificationCallback,

198

jobExecutionTopicType.JOB_NOTIFY_NEXT_TOPIC

199

)

200

201

# Query pending jobs

202

jobsClient.sendJobsQuery(jobExecutionTopicType.JOB_GET_PENDING_TOPIC)

203

```

204

205

### Job Execution Workflow

206

207

```python

208

import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT

209

from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType

210

from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType

211

from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionStatus

212

import json

213

import time

214

215

# Create jobs client

216

jobsClient = AWSIoTPyMQTT.AWSIoTMQTTThingJobsClient("deviceJobExecutor", "myDevice")

217

jobsClient.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)

218

jobsClient.configureCredentials("rootCA.crt", "private.key", "certificate.crt")

219

jobsClient.connect()

220

221

current_job_id = None

222

223

# Job response callback

224

def jobResponseCallback(client, userdata, message):

225

global current_job_id

226

print(f"Job response on {message.topic}: {message.payload.decode()}")

227

228

payload = json.loads(message.payload.decode())

229

230

# Handle start-next response

231

if "execution" in payload and payload["execution"]["jobId"]:

232

current_job_id = payload["execution"]["jobId"]

233

job_document = payload["execution"]["jobDocument"]

234

235

print(f"Started job {current_job_id}")

236

print(f"Job document: {job_document}")

237

238

# Execute job logic here

239

execute_job(current_job_id, job_document)

240

241

def execute_job(job_id, job_document):

242

"""Execute the job and report progress"""

243

try:

244

print(f"Executing job {job_id}...")

245

246

# Update job status to IN_PROGRESS

247

jobsClient.sendJobsUpdate(

248

job_id,

249

jobExecutionStatus.JOB_EXECUTION_IN_PROGRESS[0],

250

{"progress": "starting execution"}

251

)

252

253

# Simulate job execution steps

254

for step in range(1, 4):

255

print(f"Executing step {step}...")

256

time.sleep(2) # Simulate work

257

258

# Report progress

259

jobsClient.sendJobsUpdate(

260

job_id,

261

jobExecutionStatus.JOB_EXECUTION_IN_PROGRESS[0],

262

{"progress": f"completed step {step}/3"}

263

)

264

265

# Job completed successfully

266

jobsClient.sendJobsUpdate(

267

job_id,

268

jobExecutionStatus.JOB_EXECUTION_SUCCEEDED[0],

269

{"result": "job completed successfully"}

270

)

271

272

print(f"Job {job_id} completed successfully")

273

274

except Exception as e:

275

# Job failed

276

jobsClient.sendJobsUpdate(

277

job_id,

278

jobExecutionStatus.JOB_EXECUTION_FAILED[0],

279

{"error": str(e)}

280

)

281

print(f"Job {job_id} failed: {e}")

282

283

# Subscribe to job responses

284

jobsClient.createJobSubscription(

285

jobResponseCallback,

286

jobExecutionTopicType.JOB_START_NEXT_TOPIC,

287

jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE

288

)

289

290

jobsClient.createJobSubscription(

291

jobResponseCallback,

292

jobExecutionTopicType.JOB_UPDATE_TOPIC,

293

jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE

294

)

295

296

# Start next pending job

297

print("Starting next pending job...")

298

jobsClient.sendJobsStartNext({"device": "ready for work"})

299

300

# Keep running to process jobs

301

try:

302

while True:

303

time.sleep(1)

304

except KeyboardInterrupt:

305

jobsClient.disconnect()

306

```

307

308

### Advanced Job Management

309

310

```python

311

import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT

312

from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicType

313

from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionTopicReplyType

314

from AWSIoTPythonSDK.core.jobs.thingJobManager import jobExecutionStatus

315

import json

316

317

# Create jobs client with higher QoS

318

jobsClient = AWSIoTPyMQTT.AWSIoTMQTTThingJobsClient("advancedJobClient", "myDevice", QoS=1)

319

jobsClient.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)

320

jobsClient.configureCredentials("rootCA.crt", "private.key", "certificate.crt")

321

jobsClient.connect()

322

323

# Advanced job callback with detailed handling

324

def advancedJobCallback(client, userdata, message):

325

topic = message.topic

326

payload = json.loads(message.payload.decode())

327

328

print(f"Received job message on {topic}")

329

330

if "/accepted" in topic:

331

if "execution" in payload:

332

job_id = payload["execution"]["jobId"]

333

status = payload["execution"]["status"]

334

version = payload["execution"]["versionNumber"]

335

336

print(f"Job {job_id} accepted - Status: {status}, Version: {version}")

337

338

# Handle job document updates with versioning

339

if "jobDocument" in payload["execution"]:

340

job_doc = payload["execution"]["jobDocument"]

341

process_job_with_version(job_id, job_doc, version)

342

343

elif "/rejected" in topic:

344

error_code = payload.get("code", "Unknown")

345

error_message = payload.get("message", "No details")

346

print(f"Job operation rejected - Code: {error_code}, Message: {error_message}")

347

348

def process_job_with_version(job_id, job_document, expected_version):

349

"""Process job with version control for concurrent updates"""

350

try:

351

# Process job document

352

operation = job_document.get("operation", "unknown")

353

parameters = job_document.get("parameters", {})

354

355

print(f"Processing {operation} with parameters: {parameters}")

356

357

# Update with expected version for optimistic locking

358

jobsClient.sendJobsUpdate(

359

job_id,

360

jobExecutionStatus.JOB_EXECUTION_IN_PROGRESS[0],

361

statusDetails={"operation": operation, "stage": "processing"},

362

expectedVersion=expected_version,

363

includeJobExecutionState=True

364

)

365

366

# Simulate processing time

367

import time

368

time.sleep(3)

369

370

# Complete with final status

371

jobsClient.sendJobsUpdate(

372

job_id,

373

jobExecutionStatus.JOB_EXECUTION_SUCCEEDED[0],

374

statusDetails={"operation": operation, "result": "completed"},

375

expectedVersion=expected_version + 1, # Version incremented by previous update

376

includeJobDocument=False

377

)

378

379

except Exception as e:

380

jobsClient.sendJobsUpdate(

381

job_id,

382

jobExecutionStatus.JOB_EXECUTION_FAILED[0],

383

statusDetails={"error": str(e)},

384

expectedVersion=expected_version

385

)

386

387

# Subscribe to multiple job topics with different callbacks

388

jobsClient.createJobSubscription(

389

advancedJobCallback,

390

jobExecutionTopicType.JOB_START_NEXT_TOPIC,

391

jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE

392

)

393

394

jobsClient.createJobSubscription(

395

advancedJobCallback,

396

jobExecutionTopicType.JOB_UPDATE_TOPIC,

397

jobExecutionTopicReplyType.JOB_ACCEPTED_REPLY_TYPE

398

)

399

400

jobsClient.createJobSubscription(

401

advancedJobCallback,

402

jobExecutionTopicType.JOB_UPDATE_TOPIC,

403

jobExecutionTopicReplyType.JOB_REJECTED_REPLY_TYPE

404

)

405

406

# Query specific job details

407

specific_job_id = "job-12345"

408

jobsClient.sendJobsDescribe(

409

specific_job_id,

410

executionNumber=0,

411

includeJobDocument=True

412

)

413

414

# Start next job with timeout

415

jobsClient.sendJobsStartNext(

416

statusDetails={"device_info": "ready", "capabilities": ["firmware_update", "config_change"]},

417

stepTimeoutInMinutes=30

418

)

419

```

420

421

## Types

422

423

```python { .api }

424

# Job execution callback signature

425

def jobCallback(client, userdata: dict, message) -> None:

426

"""

427

Job operation callback.

428

429

Args:

430

client: MQTT client instance

431

userdata (dict): User data passed to callback

432

message: MQTT message with .topic and .payload attributes

433

"""

434

435

# Job document structure (example)

436

job_document = {

437

"operation": "firmware_update",

438

"parameters": {

439

"firmware_url": "https://example.com/firmware.bin",

440

"version": "1.2.3",

441

"checksum": "sha256:abc123..."

442

},

443

"timeout": 3600

444

}

445

446

# Job execution response structure (example)

447

job_execution_response = {

448

"execution": {

449

"jobId": "job-12345",

450

"status": "IN_PROGRESS",

451

"statusDetails": {

452

"progress": "50%",

453

"step": "downloading"

454

},

455

"queuedAt": 1609459200,

456

"startedAt": 1609459210,

457

"lastUpdatedAt": 1609459250,

458

"versionNumber": 2,

459

"executionNumber": 1,

460

"jobDocument": {

461

"operation": "firmware_update",

462

"parameters": {...}

463

}

464

},

465

"timestamp": 1609459250,

466

"clientToken": "token-abc-123"

467

}

468

```