0
# Message Handling
1
2
Message structure and processing capabilities for received MQTT messages, including topic matching, payload access, and message queue management.
3
4
## Capabilities
5
6
### Message Structure
7
8
The Message class represents an MQTT message received from the broker, providing access to all message components and metadata.
9
10
```python { .api }
11
@dataclass
12
class Message:
13
topic: Topic
14
payload: PayloadType
15
qos: int
16
retain: bool
17
mid: int
18
properties: Properties | None
19
20
def __lt__(self, other: Message) -> bool:
21
"""
22
Compare messages by message ID for ordering.
23
24
Args:
25
other (Message): Another message to compare against
26
27
Returns:
28
bool: True if this message's ID is less than the other's
29
"""
30
```
31
32
**Message attributes:**
33
34
- **topic**: The MQTT topic the message was published to, as a Topic object
35
- **payload**: The message payload, automatically decoded from bytes when possible
36
- **qos**: Quality of service level (0, 1, or 2) used for message delivery
37
- **retain**: Whether this message was retained on the broker
38
- **mid**: Unique message identifier assigned by the broker
39
- **properties**: MQTT v5.0 message properties (None for older protocol versions)
40
41
**Usage example:**
42
43
```python
44
import asyncio
45
from aiomqtt import Client
46
47
async def message_inspection():
48
async with Client("test.mosquitto.org") as client:
49
await client.subscribe("sensors/#")
50
51
async for message in client.messages:
52
# Access message components
53
print(f"Topic: {message.topic}")
54
print(f"Payload: {message.payload}")
55
print(f"QoS: {message.qos}")
56
print(f"Retained: {message.retain}")
57
print(f"Message ID: {message.mid}")
58
59
# Handle different payload types
60
if isinstance(message.payload, str):
61
print(f"Text payload: {message.payload}")
62
elif isinstance(message.payload, bytes):
63
print(f"Binary payload: {len(message.payload)} bytes")
64
elif isinstance(message.payload, (int, float)):
65
print(f"Numeric payload: {message.payload}")
66
elif message.payload is None:
67
print("Empty payload")
68
69
asyncio.run(message_inspection())
70
```
71
72
### Message Queue Iterator
73
74
The MessagesIterator provides an async iterator interface for receiving messages from the broker, with queue length inspection capabilities.
75
76
```python { .api }
77
class MessagesIterator:
78
def __aiter__(self) -> AsyncIterator[Message]:
79
"""
80
Return async iterator protocol.
81
82
Returns:
83
AsyncIterator[Message]: Async iterator for messages
84
"""
85
86
def __anext__(self) -> Message:
87
"""
88
Get next message from the queue.
89
90
Returns:
91
Message: Next received message
92
93
Raises:
94
StopAsyncIteration: When iteration is stopped
95
MqttError: If there's an error receiving messages
96
"""
97
98
def __len__(self) -> int:
99
"""
100
Get number of messages currently in the queue.
101
102
Returns:
103
int: Number of queued messages waiting to be processed
104
"""
105
```
106
107
**Usage examples:**
108
109
```python
110
import asyncio
111
from aiomqtt import Client
112
113
async def queue_monitoring():
114
async with Client("test.mosquitto.org") as client:
115
await client.subscribe("high-volume/data/#")
116
117
# Monitor queue length
118
message_count = 0
119
async for message in client.messages:
120
message_count += 1
121
queue_length = len(client.messages)
122
123
print(f"Processed {message_count} messages")
124
print(f"Queue length: {queue_length}")
125
126
# Handle queue buildup
127
if queue_length > 100:
128
print("Warning: High message queue buildup")
129
130
# Process message
131
print(f"Received: {message.payload} on {message.topic}")
132
133
async def selective_processing():
134
async with Client("test.mosquitto.org") as client:
135
await client.subscribe("sensors/#")
136
137
async for message in client.messages:
138
# Process only specific message types
139
if message.topic.matches("sensors/temperature"):
140
temp_value = float(message.payload)
141
print(f"Temperature: {temp_value}°C")
142
elif message.topic.matches("sensors/humidity"):
143
humidity_value = float(message.payload)
144
print(f"Humidity: {humidity_value}%")
145
else:
146
print(f"Ignoring message on {message.topic}")
147
148
# Run examples
149
asyncio.run(queue_monitoring())
150
```
151
152
### Message Filtering and Processing
153
154
Combine message iteration with topic matching for sophisticated message processing workflows.
155
156
**Usage examples:**
157
158
```python
159
import asyncio
160
from aiomqtt import Client, Topic, Wildcard
161
162
async def advanced_message_processing():
163
async with Client("test.mosquitto.org") as client:
164
# Subscribe to multiple topic patterns
165
await client.subscribe([
166
("sensors/+/temperature", 1),
167
("alerts/#", 2),
168
("status/+/online", 0)
169
])
170
171
# Define topic patterns for filtering
172
temp_wildcard = Wildcard("sensors/+/temperature")
173
alert_wildcard = Wildcard("alerts/#")
174
status_wildcard = Wildcard("status/+/online")
175
176
async for message in client.messages:
177
# Route messages based on topic patterns
178
if message.topic.matches(temp_wildcard):
179
await process_temperature(message)
180
elif message.topic.matches(alert_wildcard):
181
await process_alert(message)
182
elif message.topic.matches(status_wildcard):
183
await process_status(message)
184
else:
185
print(f"Unhandled message on {message.topic}")
186
187
async def process_temperature(message):
188
"""Process temperature sensor messages."""
189
try:
190
temp = float(message.payload)
191
device_id = str(message.topic).split('/')[1]
192
print(f"Device {device_id} temperature: {temp}°C")
193
194
if temp > 30:
195
print(f"Warning: High temperature on device {device_id}")
196
except (ValueError, IndexError) as e:
197
print(f"Error processing temperature message: {e}")
198
199
async def process_alert(message):
200
"""Process alert messages with priority handling."""
201
priority = "high" if message.qos == 2 else "normal"
202
print(f"Alert ({priority}): {message.payload}")
203
204
# Handle retained alerts
205
if message.retain:
206
print("This is a retained alert message")
207
208
async def process_status(message):
209
"""Process device status messages."""
210
device_id = str(message.topic).split('/')[1]
211
status = message.payload.decode() if isinstance(message.payload, bytes) else str(message.payload)
212
print(f"Device {device_id} status: {status}")
213
214
# Run advanced processing
215
asyncio.run(advanced_message_processing())
216
```
217
218
### Message Payload Handling
219
220
Handle different payload types and encodings safely.
221
222
**Usage example:**
223
224
```python
225
import asyncio
226
import json
227
from aiomqtt import Client
228
229
async def payload_handling():
230
async with Client("test.mosquitto.org") as client:
231
await client.subscribe("data/#")
232
233
async for message in client.messages:
234
payload = message.payload
235
236
# Handle different payload types
237
if payload is None:
238
print(f"Empty message on {message.topic}")
239
240
elif isinstance(payload, str):
241
# String payload - could be JSON, plain text, etc.
242
if payload.startswith('{') or payload.startswith('['):
243
try:
244
data = json.loads(payload)
245
print(f"JSON data: {data}")
246
except json.JSONDecodeError:
247
print(f"Text payload: {payload}")
248
else:
249
print(f"Text payload: {payload}")
250
251
elif isinstance(payload, bytes):
252
# Binary payload - attempt UTF-8 decode
253
try:
254
text = payload.decode('utf-8')
255
print(f"Decoded text: {text}")
256
except UnicodeDecodeError:
257
print(f"Binary data: {len(payload)} bytes")
258
259
elif isinstance(payload, (int, float)):
260
# Numeric payload
261
print(f"Numeric value: {payload}")
262
263
else:
264
print(f"Unknown payload type: {type(payload)}")
265
266
asyncio.run(payload_handling())
267
```
268
269
## Type Definitions
270
271
```python { .api }
272
PayloadType = str | bytes | bytearray | int | float | None
273
```