0
# Core Client Operations
1
2
Essential MQTT client functionality including connection management, publishing, and subscribing. The Client class serves as the main entry point for all MQTT operations using async context manager patterns.
3
4
## Capabilities
5
6
### Client Connection Management
7
8
The Client class provides async context manager support for automatic connection and disconnection management, eliminating the need for manual connection lifecycle handling.
9
10
```python { .api }
11
class Client:
12
def __init__(
13
self,
14
hostname: str,
15
port: int = 1883,
16
*,
17
username: str | None = None,
18
password: str | None = None,
19
logger: logging.Logger | None = None,
20
identifier: str | None = None,
21
queue_type: type[asyncio.Queue[Message]] | None = None,
22
protocol: ProtocolVersion | None = None,
23
will: Will | None = None,
24
clean_session: bool | None = None,
25
transport: Literal["tcp", "websockets", "unix"] = "tcp",
26
timeout: float | None = None,
27
keepalive: int = 60,
28
bind_address: str = "",
29
bind_port: int = 0,
30
clean_start: mqtt.CleanStartOption = mqtt.MQTT_CLEAN_START_FIRST_ONLY,
31
max_queued_incoming_messages: int | None = None,
32
max_queued_outgoing_messages: int | None = None,
33
max_inflight_messages: int | None = None,
34
max_concurrent_outgoing_calls: int | None = None,
35
properties: Properties | None = None,
36
tls_context: ssl.SSLContext | None = None,
37
tls_params: TLSParameters | None = None,
38
tls_insecure: bool | None = None,
39
proxy: ProxySettings | None = None,
40
socket_options: Iterable[SocketOption] | None = None,
41
websocket_path: str | None = None,
42
websocket_headers: WebSocketHeaders | None = None,
43
):
44
"""
45
Initialize MQTT client with connection parameters.
46
47
Args:
48
hostname (str): MQTT broker hostname or IP address
49
port (int): MQTT broker port, defaults to 1883
50
username (str, optional): Authentication username
51
password (str, optional): Authentication password
52
logger (logging.Logger, optional): Custom logger instance
53
identifier (str, optional): Client ID, auto-generated if None
54
queue_type (type, optional): Custom message queue class
55
protocol (ProtocolVersion, optional): MQTT protocol version
56
will (Will, optional): Last will and testament message
57
clean_session (bool, optional): Clean session flag for MQTT v3.1.1
58
transport (str): Transport protocol - "tcp", "websockets", or "unix"
59
timeout (float, optional): Default timeout for operations
60
keepalive (int): Keep-alive interval in seconds
61
bind_address (str): Local interface to bind to
62
bind_port (int): Local port to bind to
63
clean_start (mqtt.CleanStartOption): MQTT v5.0 clean start option
64
max_queued_incoming_messages (int, optional): Incoming message queue limit
65
max_queued_outgoing_messages (int, optional): Outgoing message queue limit
66
max_inflight_messages (int, optional): Maximum inflight messages
67
max_concurrent_outgoing_calls (int, optional): Concurrency limit
68
properties (Properties, optional): MQTT v5.0 connection properties
69
tls_context (ssl.SSLContext, optional): Pre-configured SSL context
70
tls_params (TLSParameters, optional): SSL/TLS configuration parameters
71
tls_insecure (bool, optional): Disable hostname verification
72
proxy (ProxySettings, optional): Proxy configuration
73
socket_options (Iterable, optional): Socket options
74
websocket_path (str, optional): WebSocket path for websocket transport
75
websocket_headers (WebSocketHeaders, optional): WebSocket headers
76
"""
77
78
async def __aenter__(self) -> Self:
79
"""
80
Connect to MQTT broker when entering async context.
81
82
Returns:
83
Self: The connected client instance
84
85
Raises:
86
MqttError: If connection fails
87
"""
88
89
async def __aexit__(
90
self,
91
exc_type: type[BaseException] | None,
92
exc: BaseException | None,
93
tb: TracebackType | None,
94
) -> None:
95
"""
96
Disconnect from MQTT broker when exiting async context.
97
98
Args:
99
exc_type: Exception type if context exited with exception
100
exc: Exception instance if context exited with exception
101
tb: Traceback if context exited with exception
102
"""
103
```
104
105
**Usage example:**
106
107
```python
108
import asyncio
109
from aiomqtt import Client
110
111
async def basic_connection():
112
# Automatic connection and disconnection
113
async with Client("test.mosquitto.org") as client:
114
print(f"Connected with client ID: {client.identifier}")
115
# Client automatically disconnects when exiting context
116
117
asyncio.run(basic_connection())
118
```
119
120
### Message Publishing
121
122
Publish messages to MQTT topics with support for quality of service, retained messages, and MQTT v5.0 properties.
123
124
```python { .api }
125
async def publish(
126
self,
127
/,
128
topic: str,
129
payload: PayloadType = None,
130
qos: int = 0,
131
retain: bool = False,
132
properties: Properties | None = None,
133
*args: Any,
134
timeout: float | None = None,
135
**kwargs: Any,
136
) -> None:
137
"""
138
Publish a message to an MQTT topic.
139
140
Args:
141
topic (str): Target topic for the message
142
payload (PayloadType, optional): Message payload, defaults to None
143
qos (int): Quality of service level (0, 1, or 2), defaults to 0
144
retain (bool): Whether to retain the message on the broker, defaults to False
145
properties (Properties, optional): MQTT v5.0 message properties
146
timeout (float, optional): Operation timeout, uses client default if None
147
148
Raises:
149
MqttError: If publish operation fails
150
MqttCodeError: If broker returns an error code
151
"""
152
```
153
154
**Usage examples:**
155
156
```python
157
import asyncio
158
from aiomqtt import Client
159
160
async def publish_examples():
161
async with Client("test.mosquitto.org") as client:
162
# Simple text message
163
await client.publish("sensors/temperature", "23.5")
164
165
# Binary payload
166
await client.publish("sensors/image", b"binary_image_data")
167
168
# Numeric payload
169
await client.publish("sensors/humidity", 65.2)
170
171
# QoS 1 with retain flag
172
await client.publish(
173
"status/online",
174
"connected",
175
qos=1,
176
retain=True
177
)
178
179
# With custom timeout
180
await client.publish(
181
"slow/topic",
182
"data",
183
timeout=10.0
184
)
185
186
asyncio.run(publish_examples())
187
```
188
189
### Topic Subscription
190
191
Subscribe to MQTT topics and wildcards with support for multiple QoS levels and subscription options.
192
193
```python { .api }
194
async def subscribe(
195
self,
196
/,
197
topic: SubscribeTopic,
198
qos: int = 0,
199
options: SubscribeOptions | None = None,
200
properties: Properties | None = None,
201
*args: Any,
202
timeout: float | None = None,
203
**kwargs: Any,
204
) -> tuple[int, ...] | list[ReasonCode]:
205
"""
206
Subscribe to one or more MQTT topics.
207
208
Args:
209
topic: Topic(s) to subscribe to - can be:
210
- str: Single topic or wildcard pattern
211
- Topic: Single topic object
212
- Wildcard: Single wildcard object
213
- list: Multiple topics with optional QoS specifications
214
qos (int): Quality of service level, defaults to 0
215
options (SubscribeOptions, optional): MQTT v5.0 subscription options
216
properties (Properties, optional): MQTT v5.0 subscription properties
217
timeout (float, optional): Operation timeout, uses client default if None
218
219
Returns:
220
tuple[int, ...] | list[ReasonCode]: Granted QoS levels or reason codes
221
222
Raises:
223
MqttError: If subscription operation fails
224
MqttCodeError: If broker returns an error code
225
"""
226
227
async def unsubscribe(
228
self,
229
topic: str | Topic | Wildcard | list[str | Topic | Wildcard],
230
properties: Properties | None = None,
231
timeout: float | None = None,
232
) -> None:
233
"""
234
Unsubscribe from one or more MQTT topics.
235
236
Args:
237
topic: Topic(s) to unsubscribe from
238
properties (Properties, optional): MQTT v5.0 properties
239
timeout (float, optional): Operation timeout, uses client default if None
240
241
Raises:
242
MqttError: If unsubscribe operation fails
243
"""
244
```
245
246
**Usage examples:**
247
248
```python
249
import asyncio
250
from aiomqtt import Client
251
252
async def subscription_examples():
253
async with Client("test.mosquitto.org") as client:
254
# Simple topic subscription
255
await client.subscribe("sensors/temperature")
256
257
# Wildcard subscriptions
258
await client.subscribe("sensors/+/temperature") # Single level wildcard
259
await client.subscribe("sensors/#") # Multi-level wildcard
260
261
# Multiple topics with different QoS
262
await client.subscribe([
263
("sensors/temperature", 0),
264
("sensors/humidity", 1),
265
("alerts/#", 2)
266
])
267
268
# QoS 1 subscription
269
await client.subscribe("important/data", qos=1)
270
271
# Unsubscribe
272
await client.unsubscribe("sensors/temperature")
273
await client.unsubscribe(["sensors/+/temp", "alerts/#"])
274
275
asyncio.run(subscription_examples())
276
```
277
278
### Message Reception
279
280
Access received messages through the async iterator interface provided by the messages property.
281
282
```python { .api }
283
@property
284
def messages(self) -> MessagesIterator:
285
"""
286
Get async iterator for received messages.
287
288
Returns:
289
MessagesIterator: Iterator for received messages
290
"""
291
292
class MessagesIterator:
293
def __aiter__(self) -> AsyncIterator[Message]:
294
"""Return async iterator."""
295
296
def __anext__(self) -> Message:
297
"""Get next message from queue."""
298
299
def __len__(self) -> int:
300
"""
301
Get number of queued messages.
302
303
Returns:
304
int: Number of messages in queue
305
"""
306
```
307
308
**Usage example:**
309
310
```python
311
import asyncio
312
from aiomqtt import Client
313
314
async def message_reception():
315
async with Client("test.mosquitto.org") as client:
316
await client.subscribe("sensors/#")
317
318
# Receive messages indefinitely
319
async for message in client.messages:
320
print(f"Topic: {message.topic}")
321
print(f"Payload: {message.payload}")
322
print(f"QoS: {message.qos}")
323
324
# Process specific topics
325
if message.topic.matches("sensors/temperature"):
326
temperature = float(message.payload)
327
print(f"Temperature: {temperature}°C")
328
329
# Check queue length
330
print(f"Messages in queue: {len(client.messages)}")
331
332
asyncio.run(message_reception())
333
```
334
335
### Client Properties
336
337
Access client configuration and status information.
338
339
```python { .api }
340
@property
341
def identifier(self) -> str:
342
"""
343
Get client identifier.
344
345
Returns:
346
str: MQTT client identifier
347
"""
348
349
@property
350
def pending_calls_threshold(self) -> int:
351
"""
352
Get warning threshold for pending calls.
353
354
Returns:
355
int: Threshold value for pending calls warning
356
"""
357
358
@property
359
def timeout(self) -> float:
360
"""
361
Get default timeout value.
362
363
Returns:
364
float: Default timeout in seconds
365
"""
366
```
367
368
## Type Definitions
369
370
```python { .api }
371
PayloadType = str | bytes | bytearray | int | float | None
372
373
SubscribeTopic = (
374
str
375
| tuple[str, SubscribeOptions]
376
| list[tuple[str, SubscribeOptions]]
377
| list[tuple[str, int]]
378
)
379
380
WebSocketHeaders = dict[str, str] | Callable[[dict[str, str]], dict[str, str]]
381
382
PahoSocket = socket.socket | ssl.SSLSocket | Any
383
384
SocketOption = tuple[int, int, int | bytes] | tuple[int, int, None, int]
385
```