0
# Core MQTT Operations
1
2
Core MQTT connectivity for AWS IoT providing secure communication through TLS mutual authentication or WebSocket SigV4. Includes connection management, publish/subscribe operations, and advanced features like auto-reconnect, progressive backoff, and offline request queueing.
3
4
## Capabilities
5
6
### MQTT Client Creation
7
8
Create an MQTT client with flexible configuration options for protocol version, connection type, and session management.
9
10
```python { .api }
11
class AWSIoTMQTTClient:
12
def __init__(self, clientID: str, protocolType: int = MQTTv3_1_1, useWebsocket: bool = False, cleanSession: bool = True):
13
"""
14
Create AWS IoT MQTT client.
15
16
Args:
17
clientID (str): Client identifier for MQTT connection
18
protocolType (int): MQTT version (MQTTv3_1=3, MQTTv3_1_1=4)
19
useWebsocket (bool): Enable MQTT over WebSocket SigV4
20
cleanSession (bool): Start with clean session state
21
"""
22
```
23
24
### Connection Configuration
25
26
Configure endpoint, authentication, and connection parameters before establishing connection.
27
28
```python { .api }
29
def configureEndpoint(self, hostName: str, portNumber: int):
30
"""
31
Configure AWS IoT endpoint.
32
33
Args:
34
hostName (str): AWS IoT endpoint hostname
35
portNumber (int): Port (8883 for TLS, 443 for WebSocket/ALPN)
36
"""
37
38
def configureCredentials(self, CAFilePath: str, KeyPath: str = "", CertificatePath: str = "", Ciphers: str = None):
39
"""
40
Configure X.509 certificate credentials for TLS mutual authentication.
41
42
Args:
43
CAFilePath (str): Path to root CA certificate file
44
KeyPath (str): Path to private key file (required for TLS)
45
CertificatePath (str): Path to device certificate file (required for TLS)
46
Ciphers (str): SSL cipher suite string (optional)
47
"""
48
49
def configureIAMCredentials(self, AWSAccessKeyID: str, AWSSecretAccessKey: str, AWSSessionToken: str = ""):
50
"""
51
Configure IAM credentials for WebSocket SigV4 authentication.
52
53
Args:
54
AWSAccessKeyID (str): AWS access key ID
55
AWSSecretAccessKey (str): AWS secret access key
56
AWSSessionToken (str): AWS session token for temporary credentials
57
"""
58
59
def configureUsernamePassword(self, username: str, password: str = None):
60
"""
61
Configure MQTT username and password.
62
63
Args:
64
username (str): MQTT username
65
password (str): MQTT password (optional)
66
"""
67
68
def configureLastWill(self, topic: str, payload: str, QoS: int, retain: bool = False):
69
"""
70
Configure Last Will and Testament message for unexpected disconnections.
71
72
Args:
73
topic (str): Topic to publish last will message to
74
payload (str): Last will message payload
75
QoS (int): Quality of Service level (0 or 1)
76
retain (bool): Whether to retain the last will message
77
"""
78
79
def clearLastWill(self):
80
"""
81
Clear the previously configured Last Will and Testament.
82
"""
83
```
84
85
### Advanced Configuration
86
87
Configure auto-reconnect behavior, offline queueing, timeouts, and other advanced features.
88
89
```python { .api }
90
def configureAutoReconnectBackoffTime(self, baseReconnectQuietTimeSecond: int, maxReconnectQuietTimeSecond: int, stableConnectionTimeSecond: int):
91
"""
92
Configure progressive reconnect backoff timing.
93
94
Args:
95
baseReconnectQuietTimeSecond (int): Initial backoff time in seconds
96
maxReconnectQuietTimeSecond (int): Maximum backoff time in seconds
97
stableConnectionTimeSecond (int): Stable connection threshold in seconds
98
"""
99
100
def configureOfflinePublishQueueing(self, queueSize: int, dropBehavior: int = DROP_NEWEST):
101
"""
102
Configure offline request queueing for publish operations.
103
104
Args:
105
queueSize (int): Queue size (0=disabled, -1=infinite)
106
dropBehavior (int): DROP_OLDEST=0 or DROP_NEWEST=1
107
"""
108
109
def configureDrainingFrequency(self, frequencyInHz: float):
110
"""
111
Configure queue draining speed when connection restored.
112
113
Args:
114
frequencyInHz (float): Draining frequency in requests per second
115
"""
116
117
def configureConnectDisconnectTimeout(self, timeoutSecond: int):
118
"""
119
Configure connection/disconnection timeout.
120
121
Args:
122
timeoutSecond (int): Timeout in seconds for CONNACK/disconnect
123
"""
124
125
def configureMQTTOperationTimeout(self, timeoutSecond: int):
126
"""
127
Configure MQTT operation timeout for QoS 1 operations.
128
129
Args:
130
timeoutSecond (int): Timeout in seconds for PUBACK/SUBACK/UNSUBACK
131
"""
132
```
133
134
### Connection Management
135
136
Establish and manage MQTT connections with both synchronous and asynchronous options.
137
138
```python { .api }
139
def connect(self, keepAliveIntervalSecond: int = 600) -> bool:
140
"""
141
Connect to AWS IoT synchronously.
142
143
Args:
144
keepAliveIntervalSecond (int): MQTT keep-alive interval in seconds
145
146
Returns:
147
bool: True if connection successful, False otherwise
148
"""
149
150
def connectAsync(self, keepAliveIntervalSecond: int = 600, ackCallback: callable = None) -> int:
151
"""
152
Connect to AWS IoT asynchronously.
153
154
Args:
155
keepAliveIntervalSecond (int): MQTT keep-alive interval in seconds
156
ackCallback (callable): Callback for CONNACK (mid, data) -> None
157
158
Returns:
159
int: Packet ID for tracking in callback
160
"""
161
162
def disconnect() -> bool:
163
"""
164
Disconnect from AWS IoT synchronously.
165
166
Returns:
167
bool: True if disconnect successful, False otherwise
168
"""
169
170
def disconnectAsync(self, ackCallback: callable = None) -> int:
171
"""
172
Disconnect from AWS IoT asynchronously.
173
174
Args:
175
ackCallback (callable): Callback for disconnect completion (mid, data) -> None
176
177
Returns:
178
int: Packet ID for tracking in callback
179
"""
180
```
181
182
### Publish Operations
183
184
Publish messages to AWS IoT topics with QoS support and asynchronous callbacks.
185
186
```python { .api }
187
def publish(self, topic: str, payload: str, QoS: int) -> bool:
188
"""
189
Publish message synchronously.
190
191
Args:
192
topic (str): MQTT topic name
193
payload (str): Message payload
194
QoS (int): Quality of Service (0 or 1)
195
196
Returns:
197
bool: True if publish request sent, False otherwise
198
"""
199
200
def publishAsync(self, topic: str, payload: str, QoS: int, ackCallback: callable = None) -> int:
201
"""
202
Publish message asynchronously.
203
204
Args:
205
topic (str): MQTT topic name
206
payload (str): Message payload
207
QoS (int): Quality of Service (0 or 1)
208
ackCallback (callable): Callback for PUBACK (mid) -> None (QoS 1 only)
209
210
Returns:
211
int: Packet ID for tracking in callback
212
"""
213
```
214
215
### Subscribe Operations
216
217
Subscribe to MQTT topics with message callbacks and QoS support.
218
219
```python { .api }
220
def subscribe(self, topic: str, QoS: int, callback: callable) -> bool:
221
"""
222
Subscribe to topic synchronously.
223
224
Args:
225
topic (str): MQTT topic name or filter
226
QoS (int): Quality of Service (0 or 1)
227
callback (callable): Message callback (client, userdata, message) -> None
228
229
Returns:
230
bool: True if subscribe successful, False otherwise
231
"""
232
233
def subscribeAsync(self, topic: str, QoS: int, ackCallback: callable = None, messageCallback: callable = None) -> int:
234
"""
235
Subscribe to topic asynchronously.
236
237
Args:
238
topic (str): MQTT topic name or filter
239
QoS (int): Quality of Service (0 or 1)
240
ackCallback (callable): Callback for SUBACK (mid, data) -> None
241
messageCallback (callable): Message callback (client, userdata, message) -> None
242
243
Returns:
244
int: Packet ID for tracking in callback
245
"""
246
247
def unsubscribe(self, topic: str) -> bool:
248
"""
249
Unsubscribe from topic synchronously.
250
251
Args:
252
topic (str): MQTT topic name or filter
253
254
Returns:
255
bool: True if unsubscribe successful, False otherwise
256
"""
257
258
def unsubscribeAsync(self, topic: str, ackCallback: callable = None) -> int:
259
"""
260
Unsubscribe from topic asynchronously.
261
262
Args:
263
topic (str): MQTT topic name or filter
264
ackCallback (callable): Callback for UNSUBACK (mid) -> None
265
266
Returns:
267
int: Packet ID for tracking in callback
268
"""
269
```
270
271
### Event Callbacks
272
273
Configure global callbacks for connection state and message events.
274
275
```python { .api }
276
def onOnline(self):
277
"""
278
Override this method to handle online events.
279
Called when client connects to AWS IoT.
280
"""
281
282
def onOffline(self):
283
"""
284
Override this method to handle offline events.
285
Called when client disconnects from AWS IoT.
286
"""
287
288
def onMessage(self, message):
289
"""
290
Override this method to handle all incoming messages.
291
292
Args:
293
message: MQTT message with .topic and .payload attributes
294
"""
295
```
296
297
### Metrics and Socket Configuration
298
299
Additional configuration options for metrics collection and custom socket factories.
300
301
```python { .api }
302
def enableMetricsCollection(self):
303
"""Enable SDK metrics collection (enabled by default)."""
304
305
def disableMetricsCollection(self):
306
"""Disable SDK metrics collection."""
307
308
def configureSocketFactory(self, socket_factory: callable):
309
"""
310
Configure custom socket factory for proxy support.
311
312
Args:
313
socket_factory (callable): Function that returns configured socket
314
"""
315
```
316
317
## Usage Examples
318
319
### Basic MQTT Publish/Subscribe
320
321
```python
322
import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
323
324
# Create and configure client
325
client = AWSIoTPyMQTT.AWSIoTMQTTClient("myDevice")
326
client.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
327
client.configureCredentials("rootCA.crt", "private.key", "certificate.crt")
328
329
# Configure auto-reconnect
330
client.configureAutoReconnectBackoffTime(1, 32, 20)
331
client.configureOfflinePublishQueueing(-1) # Infinite queue
332
client.configureDrainingFrequency(2) # 2 Hz
333
334
# Define message callback
335
def customCallback(client, userdata, message):
336
print(f"Received message on {message.topic}: {message.payload.decode()}")
337
338
# Connect and subscribe
339
client.connect()
340
client.subscribe("device/data", 1, customCallback)
341
342
# Publish message
343
client.publish("device/status", "online", 0)
344
```
345
346
### WebSocket Connection with IAM
347
348
```python
349
import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
350
351
# Create WebSocket client
352
client = AWSIoTPyMQTT.AWSIoTMQTTClient("webClient", useWebsocket=True)
353
client.configureEndpoint("endpoint.iot.region.amazonaws.com", 443)
354
client.configureIAMCredentials("AKIA...", "secret", "session-token")
355
356
# Connect and publish
357
client.connect()
358
client.publish("web/data", '{"temperature": 25.0}', 1)
359
client.disconnect()
360
```
361
362
### Asynchronous Operations
363
364
```python
365
import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT
366
367
client = AWSIoTPyMQTT.AWSIoTMQTTClient("asyncClient")
368
client.configureEndpoint("endpoint.iot.region.amazonaws.com", 8883)
369
client.configureCredentials("rootCA.crt", "private.key", "certificate.crt")
370
371
# Async connection callback
372
def connackCallback(mid, data):
373
print(f"Connected with mid: {mid}, result: {data}")
374
375
# Async publish callback
376
def pubackCallback(mid):
377
print(f"Published message mid: {mid}")
378
379
# Connect asynchronously
380
mid = client.connectAsync(ackCallback=connackCallback)
381
print(f"Connection request sent with mid: {mid}")
382
383
# Publish asynchronously
384
pub_mid = client.publishAsync("async/topic", "hello", 1, pubackCallback)
385
print(f"Publish request sent with mid: {pub_mid}")
386
```
387
388
## Types
389
390
```python { .api }
391
# Protocol version constants
392
MQTTv3_1 = 3
393
MQTTv3_1_1 = 4
394
395
# Queue drop behavior constants
396
DROP_OLDEST = 0
397
DROP_NEWEST = 1
398
399
# Message object (received in callbacks)
400
class Message:
401
topic: str # Message topic
402
payload: bytes # Message payload as bytes
403
```