or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration-security.mdcore-client.mderror-handling.mdindex.mdmessage-handling.mdtopic-management.md

message-handling.mddocs/

0

# Message Handling

1

2

Message structure and processing capabilities for received MQTT messages, including topic matching, payload access, and message queue management.

3

4

## Capabilities

5

6

### Message Structure

7

8

The Message class represents an MQTT message received from the broker, providing access to all message components and metadata.

9

10

```python { .api }

11

@dataclass

12

class Message:

13

topic: Topic

14

payload: PayloadType

15

qos: int

16

retain: bool

17

mid: int

18

properties: Properties | None

19

20

def __lt__(self, other: Message) -> bool:

21

"""

22

Compare messages by message ID for ordering.

23

24

Args:

25

other (Message): Another message to compare against

26

27

Returns:

28

bool: True if this message's ID is less than the other's

29

"""

30

```

31

32

**Message attributes:**

33

34

- **topic**: The MQTT topic the message was published to, as a Topic object

35

- **payload**: The message payload, automatically decoded from bytes when possible

36

- **qos**: Quality of service level (0, 1, or 2) used for message delivery

37

- **retain**: Whether this message was retained on the broker

38

- **mid**: Unique message identifier assigned by the broker

39

- **properties**: MQTT v5.0 message properties (None for older protocol versions)

40

41

**Usage example:**

42

43

```python

44

import asyncio

45

from aiomqtt import Client

46

47

async def message_inspection():

48

async with Client("test.mosquitto.org") as client:

49

await client.subscribe("sensors/#")

50

51

async for message in client.messages:

52

# Access message components

53

print(f"Topic: {message.topic}")

54

print(f"Payload: {message.payload}")

55

print(f"QoS: {message.qos}")

56

print(f"Retained: {message.retain}")

57

print(f"Message ID: {message.mid}")

58

59

# Handle different payload types

60

if isinstance(message.payload, str):

61

print(f"Text payload: {message.payload}")

62

elif isinstance(message.payload, bytes):

63

print(f"Binary payload: {len(message.payload)} bytes")

64

elif isinstance(message.payload, (int, float)):

65

print(f"Numeric payload: {message.payload}")

66

elif message.payload is None:

67

print("Empty payload")

68

69

asyncio.run(message_inspection())

70

```

71

72

### Message Queue Iterator

73

74

The MessagesIterator provides an async iterator interface for receiving messages from the broker, with queue length inspection capabilities.

75

76

```python { .api }

77

class MessagesIterator:

78

def __aiter__(self) -> AsyncIterator[Message]:

79

"""

80

Return async iterator protocol.

81

82

Returns:

83

AsyncIterator[Message]: Async iterator for messages

84

"""

85

86

def __anext__(self) -> Message:

87

"""

88

Get next message from the queue.

89

90

Returns:

91

Message: Next received message

92

93

Raises:

94

StopAsyncIteration: When iteration is stopped

95

MqttError: If there's an error receiving messages

96

"""

97

98

def __len__(self) -> int:

99

"""

100

Get number of messages currently in the queue.

101

102

Returns:

103

int: Number of queued messages waiting to be processed

104

"""

105

```

106

107

**Usage examples:**

108

109

```python

110

import asyncio

111

from aiomqtt import Client

112

113

async def queue_monitoring():

114

async with Client("test.mosquitto.org") as client:

115

await client.subscribe("high-volume/data/#")

116

117

# Monitor queue length

118

message_count = 0

119

async for message in client.messages:

120

message_count += 1

121

queue_length = len(client.messages)

122

123

print(f"Processed {message_count} messages")

124

print(f"Queue length: {queue_length}")

125

126

# Handle queue buildup

127

if queue_length > 100:

128

print("Warning: High message queue buildup")

129

130

# Process message

131

print(f"Received: {message.payload} on {message.topic}")

132

133

async def selective_processing():

134

async with Client("test.mosquitto.org") as client:

135

await client.subscribe("sensors/#")

136

137

async for message in client.messages:

138

# Process only specific message types

139

if message.topic.matches("sensors/temperature"):

140

temp_value = float(message.payload)

141

print(f"Temperature: {temp_value}°C")

142

elif message.topic.matches("sensors/humidity"):

143

humidity_value = float(message.payload)

144

print(f"Humidity: {humidity_value}%")

145

else:

146

print(f"Ignoring message on {message.topic}")

147

148

# Run examples

149

asyncio.run(queue_monitoring())

150

```

151

152

### Message Filtering and Processing

153

154

Combine message iteration with topic matching for sophisticated message processing workflows.

155

156

**Usage examples:**

157

158

```python

159

import asyncio

160

from aiomqtt import Client, Topic, Wildcard

161

162

async def advanced_message_processing():

163

async with Client("test.mosquitto.org") as client:

164

# Subscribe to multiple topic patterns

165

await client.subscribe([

166

("sensors/+/temperature", 1),

167

("alerts/#", 2),

168

("status/+/online", 0)

169

])

170

171

# Define topic patterns for filtering

172

temp_wildcard = Wildcard("sensors/+/temperature")

173

alert_wildcard = Wildcard("alerts/#")

174

status_wildcard = Wildcard("status/+/online")

175

176

async for message in client.messages:

177

# Route messages based on topic patterns

178

if message.topic.matches(temp_wildcard):

179

await process_temperature(message)

180

elif message.topic.matches(alert_wildcard):

181

await process_alert(message)

182

elif message.topic.matches(status_wildcard):

183

await process_status(message)

184

else:

185

print(f"Unhandled message on {message.topic}")

186

187

async def process_temperature(message):

188

"""Process temperature sensor messages."""

189

try:

190

temp = float(message.payload)

191

device_id = str(message.topic).split('/')[1]

192

print(f"Device {device_id} temperature: {temp}°C")

193

194

if temp > 30:

195

print(f"Warning: High temperature on device {device_id}")

196

except (ValueError, IndexError) as e:

197

print(f"Error processing temperature message: {e}")

198

199

async def process_alert(message):

200

"""Process alert messages with priority handling."""

201

priority = "high" if message.qos == 2 else "normal"

202

print(f"Alert ({priority}): {message.payload}")

203

204

# Handle retained alerts

205

if message.retain:

206

print("This is a retained alert message")

207

208

async def process_status(message):

209

"""Process device status messages."""

210

device_id = str(message.topic).split('/')[1]

211

status = message.payload.decode() if isinstance(message.payload, bytes) else str(message.payload)

212

print(f"Device {device_id} status: {status}")

213

214

# Run advanced processing

215

asyncio.run(advanced_message_processing())

216

```

217

218

### Message Payload Handling

219

220

Handle different payload types and encodings safely.

221

222

**Usage example:**

223

224

```python

225

import asyncio

226

import json

227

from aiomqtt import Client

228

229

async def payload_handling():

230

async with Client("test.mosquitto.org") as client:

231

await client.subscribe("data/#")

232

233

async for message in client.messages:

234

payload = message.payload

235

236

# Handle different payload types

237

if payload is None:

238

print(f"Empty message on {message.topic}")

239

240

elif isinstance(payload, str):

241

# String payload - could be JSON, plain text, etc.

242

if payload.startswith('{') or payload.startswith('['):

243

try:

244

data = json.loads(payload)

245

print(f"JSON data: {data}")

246

except json.JSONDecodeError:

247

print(f"Text payload: {payload}")

248

else:

249

print(f"Text payload: {payload}")

250

251

elif isinstance(payload, bytes):

252

# Binary payload - attempt UTF-8 decode

253

try:

254

text = payload.decode('utf-8')

255

print(f"Decoded text: {text}")

256

except UnicodeDecodeError:

257

print(f"Binary data: {len(payload)} bytes")

258

259

elif isinstance(payload, (int, float)):

260

# Numeric payload

261

print(f"Numeric value: {payload}")

262

263

else:

264

print(f"Unknown payload type: {type(payload)}")

265

266

asyncio.run(payload_handling())

267

```

268

269

## Type Definitions

270

271

```python { .api }

272

PayloadType = str | bytes | bytearray | int | float | None

273

```