or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

device-shadows.mdexception-handling.mdgreengrass-discovery.mdindex.mdiot-jobs.mdmqtt-client.md

mqtt-client.mddocs/

0

# Core MQTT Operations

1

2

Core MQTT connectivity for AWS IoT providing secure communication through TLS mutual authentication or WebSocket SigV4. Includes connection management, publish/subscribe operations, and advanced features like auto-reconnect, progressive backoff, and offline request queueing.

3

4

## Capabilities

5

6

### MQTT Client Creation

7

8

Create an MQTT client with flexible configuration options for protocol version, connection type, and session management.

9

10

```python { .api }

11

class AWSIoTMQTTClient:

12

def __init__(self, clientID: str, protocolType: int = MQTTv3_1_1, useWebsocket: bool = False, cleanSession: bool = True):

13

"""

14

Create AWS IoT MQTT client.

15

16

Args:

17

clientID (str): Client identifier for MQTT connection

18

protocolType (int): MQTT version (MQTTv3_1=3, MQTTv3_1_1=4)

19

useWebsocket (bool): Enable MQTT over WebSocket SigV4

20

cleanSession (bool): Start with clean session state

21

"""

22

```

23

24

### Connection Configuration

25

26

Configure endpoint, authentication, and connection parameters before establishing connection.

27

28

```python { .api }

29

def configureEndpoint(self, hostName: str, portNumber: int):

30

"""

31

Configure AWS IoT endpoint.

32

33

Args:

34

hostName (str): AWS IoT endpoint hostname

35

portNumber (int): Port (8883 for TLS, 443 for WebSocket/ALPN)

36

"""

37

38

def configureCredentials(self, CAFilePath: str, KeyPath: str = "", CertificatePath: str = "", Ciphers: str = None):

39

"""

40

Configure X.509 certificate credentials for TLS mutual authentication.

41

42

Args:

43

CAFilePath (str): Path to root CA certificate file

44

KeyPath (str): Path to private key file (required for TLS)

45

CertificatePath (str): Path to device certificate file (required for TLS)

46

Ciphers (str): SSL cipher suite string (optional)

47

"""

48

49

def configureIAMCredentials(self, AWSAccessKeyID: str, AWSSecretAccessKey: str, AWSSessionToken: str = ""):

50

"""

51

Configure IAM credentials for WebSocket SigV4 authentication.

52

53

Args:

54

AWSAccessKeyID (str): AWS access key ID

55

AWSSecretAccessKey (str): AWS secret access key

56

AWSSessionToken (str): AWS session token for temporary credentials

57

"""

58

59

def configureUsernamePassword(self, username: str, password: str = None):

60

"""

61

Configure MQTT username and password.

62

63

Args:

64

username (str): MQTT username

65

password (str): MQTT password (optional)

66

"""

67

68

def configureLastWill(self, topic: str, payload: str, QoS: int, retain: bool = False):

69

"""

70

Configure Last Will and Testament message for unexpected disconnections.

71

72

Args:

73

topic (str): Topic to publish last will message to

74

payload (str): Last will message payload

75

QoS (int): Quality of Service level (0 or 1)

76

retain (bool): Whether to retain the last will message

77

"""

78

79

def clearLastWill(self):

80

"""

81

Clear the previously configured Last Will and Testament.

82

"""

83

```

84

85

### Advanced Configuration

86

87

Configure auto-reconnect behavior, offline queueing, timeouts, and other advanced features.

88

89

```python { .api }

90

def configureAutoReconnectBackoffTime(self, baseReconnectQuietTimeSecond: int, maxReconnectQuietTimeSecond: int, stableConnectionTimeSecond: int):

91

"""

92

Configure progressive reconnect backoff timing.

93

94

Args:

95

baseReconnectQuietTimeSecond (int): Initial backoff time in seconds

96

maxReconnectQuietTimeSecond (int): Maximum backoff time in seconds

97

stableConnectionTimeSecond (int): Stable connection threshold in seconds

98

"""

99

100

def configureOfflinePublishQueueing(self, queueSize: int, dropBehavior: int = DROP_NEWEST):

101

"""

102

Configure offline request queueing for publish operations.

103

104

Args:

105

queueSize (int): Queue size (0=disabled, -1=infinite)

106

dropBehavior (int): DROP_OLDEST=0 or DROP_NEWEST=1

107

"""

108

109

def configureDrainingFrequency(self, frequencyInHz: float):

110

"""

111

Configure queue draining speed when connection restored.

112

113

Args:

114

frequencyInHz (float): Draining frequency in requests per second

115

"""

116

117

def configureConnectDisconnectTimeout(self, timeoutSecond: int):

118

"""

119

Configure connection/disconnection timeout.

120

121

Args:

122

timeoutSecond (int): Timeout in seconds for CONNACK/disconnect

123

"""

124

125

def configureMQTTOperationTimeout(self, timeoutSecond: int):

126

"""

127

Configure MQTT operation timeout for QoS 1 operations.

128

129

Args:

130

timeoutSecond (int): Timeout in seconds for PUBACK/SUBACK/UNSUBACK

131

"""

132

```

133

134

### Connection Management

135

136

Establish and manage MQTT connections with both synchronous and asynchronous options.

137

138

```python { .api }

139

def connect(self, keepAliveIntervalSecond: int = 600) -> bool:

140

"""

141

Connect to AWS IoT synchronously.

142

143

Args:

144

keepAliveIntervalSecond (int): MQTT keep-alive interval in seconds

145

146

Returns:

147

bool: True if connection successful, False otherwise

148

"""

149

150

def connectAsync(self, keepAliveIntervalSecond: int = 600, ackCallback: callable = None) -> int:

151

"""

152

Connect to AWS IoT asynchronously.

153

154

Args:

155

keepAliveIntervalSecond (int): MQTT keep-alive interval in seconds

156

ackCallback (callable): Callback for CONNACK (mid, data) -> None

157

158

Returns:

159

int: Packet ID for tracking in callback

160

"""

161

162

def disconnect() -> bool:

163

"""

164

Disconnect from AWS IoT synchronously.

165

166

Returns:

167

bool: True if disconnect successful, False otherwise

168

"""

169

170

def disconnectAsync(self, ackCallback: callable = None) -> int:

171

"""

172

Disconnect from AWS IoT asynchronously.

173

174

Args:

175

ackCallback (callable): Callback for disconnect completion (mid, data) -> None

176

177

Returns:

178

int: Packet ID for tracking in callback

179

"""

180

```

181

182

### Publish Operations

183

184

Publish messages to AWS IoT topics with QoS support and asynchronous callbacks.

185

186

```python { .api }

187

def publish(self, topic: str, payload: str, QoS: int) -> bool:

188

"""

189

Publish message synchronously.

190

191

Args:

192

topic (str): MQTT topic name

193

payload (str): Message payload

194

QoS (int): Quality of Service (0 or 1)

195

196

Returns:

197

bool: True if publish request sent, False otherwise

198

"""

199

200

def publishAsync(self, topic: str, payload: str, QoS: int, ackCallback: callable = None) -> int:

201

"""

202

Publish message asynchronously.

203

204

Args:

205

topic (str): MQTT topic name

206

payload (str): Message payload

207

QoS (int): Quality of Service (0 or 1)

208

ackCallback (callable): Callback for PUBACK (mid) -> None (QoS 1 only)

209

210

Returns:

211

int: Packet ID for tracking in callback

212

"""

213

```

214

215

### Subscribe Operations

216

217

Subscribe to MQTT topics with message callbacks and QoS support.

218

219

```python { .api }

220

def subscribe(self, topic: str, QoS: int, callback: callable) -> bool:

221

"""

222

Subscribe to topic synchronously.

223

224

Args:

225

topic (str): MQTT topic name or filter

226

QoS (int): Quality of Service (0 or 1)

227

callback (callable): Message callback (client, userdata, message) -> None

228

229

Returns:

230

bool: True if subscribe successful, False otherwise

231

"""

232

233

def subscribeAsync(self, topic: str, QoS: int, ackCallback: callable = None, messageCallback: callable = None) -> int:

234

"""

235

Subscribe to topic asynchronously.

236

237

Args:

238

topic (str): MQTT topic name or filter

239

QoS (int): Quality of Service (0 or 1)

240

ackCallback (callable): Callback for SUBACK (mid, data) -> None

241

messageCallback (callable): Message callback (client, userdata, message) -> None

242

243

Returns:

244

int: Packet ID for tracking in callback

245

"""

246

247

def unsubscribe(self, topic: str) -> bool:

248

"""

249

Unsubscribe from topic synchronously.

250

251

Args:

252

topic (str): MQTT topic name or filter

253

254

Returns:

255

bool: True if unsubscribe successful, False otherwise

256

"""

257

258

def unsubscribeAsync(self, topic: str, ackCallback: callable = None) -> int:

259

"""

260

Unsubscribe from topic asynchronously.

261

262

Args:

263

topic (str): MQTT topic name or filter

264

ackCallback (callable): Callback for UNSUBACK (mid) -> None

265

266

Returns:

267

int: Packet ID for tracking in callback

268

"""

269

```

270

271

### Event Callbacks

272

273

Configure global callbacks for connection state and message events.

274

275

```python { .api }

276

def onOnline(self):

277

"""

278

Override this method to handle online events.

279

Called when client connects to AWS IoT.

280

"""

281

282

def onOffline(self):

283

"""

284

Override this method to handle offline events.

285

Called when client disconnects from AWS IoT.

286

"""

287

288

def onMessage(self, message):

289

"""

290

Override this method to handle all incoming messages.

291

292

Args:

293

message: MQTT message with .topic and .payload attributes

294

"""

295

```

296

297

### Metrics and Socket Configuration

298

299

Additional configuration options for metrics collection and custom socket factories.

300

301

```python { .api }

302

def enableMetricsCollection(self):

303

"""Enable SDK metrics collection (enabled by default)."""

304

305

def disableMetricsCollection(self):

306

"""Disable SDK metrics collection."""

307

308

def configureSocketFactory(self, socket_factory: callable):

309

"""

310

Configure custom socket factory for proxy support.

311

312

Args:

313

socket_factory (callable): Function that returns configured socket

314

"""

315

```

316

317

## Usage Examples

318

319

### Basic MQTT Publish/Subscribe

320

321

```python

322

import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT

323

324

# Create and configure client

325

client = AWSIoTPyMQTT.AWSIoTMQTTClient("myDevice")

326

client.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)

327

client.configureCredentials("rootCA.crt", "private.key", "certificate.crt")

328

329

# Configure auto-reconnect

330

client.configureAutoReconnectBackoffTime(1, 32, 20)

331

client.configureOfflinePublishQueueing(-1) # Infinite queue

332

client.configureDrainingFrequency(2) # 2 Hz

333

334

# Define message callback

335

def customCallback(client, userdata, message):

336

print(f"Received message on {message.topic}: {message.payload.decode()}")

337

338

# Connect and subscribe

339

client.connect()

340

client.subscribe("device/data", 1, customCallback)

341

342

# Publish message

343

client.publish("device/status", "online", 0)

344

```

345

346

### WebSocket Connection with IAM

347

348

```python

349

import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT

350

351

# Create WebSocket client

352

client = AWSIoTPyMQTT.AWSIoTMQTTClient("webClient", useWebsocket=True)

353

client.configureEndpoint("endpoint.iot.region.amazonaws.com", 443)

354

client.configureIAMCredentials("AKIA...", "secret", "session-token")

355

356

# Connect and publish

357

client.connect()

358

client.publish("web/data", '{"temperature": 25.0}', 1)

359

client.disconnect()

360

```

361

362

### Asynchronous Operations

363

364

```python

365

import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT

366

367

client = AWSIoTPyMQTT.AWSIoTMQTTClient("asyncClient")

368

client.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)

369

client.configureCredentials("rootCA.crt", "private.key", "certificate.crt")

370

371

# Async connection callback

372

def connackCallback(mid, data):

373

print(f"Connected with mid: {mid}, result: {data}")

374

375

# Async publish callback

376

def pubackCallback(mid):

377

print(f"Published message mid: {mid}")

378

379

# Connect asynchronously

380

mid = client.connectAsync(ackCallback=connackCallback)

381

print(f"Connection request sent with mid: {mid}")

382

383

# Publish asynchronously

384

pub_mid = client.publishAsync("async/topic", "hello", 1, pubackCallback)

385

print(f"Publish request sent with mid: {pub_mid}")

386

```

387

388

## Types

389

390

```python { .api }

391

# Protocol version constants

392

MQTTv3_1 = 3

393

MQTTv3_1_1 = 4

394

395

# Queue drop behavior constants

396

DROP_OLDEST = 0

397

DROP_NEWEST = 1

398

399

# Message object (received in callbacks)

400

class Message:

401

topic: str # Message topic

402

payload: bytes # Message payload as bytes

403

```