0
# Topic Management
1
2
Topic validation, wildcard support, and matching logic for MQTT topic patterns. Provides type-safe topic handling with validation and matching capabilities.
3
4
## Capabilities
5
6
### Topic Validation and Representation
7
8
The Topic class provides validated MQTT topic handling with automatic validation and string conversion.
9
10
```python { .api }
11
@dataclass
12
class Topic:
13
value: str
14
15
def __post_init__(self) -> None:
16
"""
17
Validate topic after initialization.
18
19
Raises:
20
ValueError: If topic contains invalid characters or patterns
21
"""
22
23
def __str__(self) -> str:
24
"""
25
Get string representation of topic.
26
27
Returns:
28
str: Topic as string
29
"""
30
31
def matches(self, wildcard: WildcardLike) -> bool:
32
"""
33
Check if this topic matches a wildcard pattern.
34
35
Args:
36
wildcard (WildcardLike): Wildcard pattern to match against
37
38
Returns:
39
bool: True if topic matches the wildcard pattern
40
"""
41
```
42
43
**Usage examples:**
44
45
```python
46
from aiomqtt import Topic, Wildcard
47
48
# Create and validate topics
49
temp_topic = Topic("sensors/living-room/temperature")
50
humidity_topic = Topic("sensors/kitchen/humidity")
51
52
# String conversion
53
print(str(temp_topic)) # "sensors/living-room/temperature"
54
55
# Topic matching with wildcards
56
wildcard = Wildcard("sensors/+/temperature")
57
print(temp_topic.matches(wildcard)) # True
58
print(humidity_topic.matches(wildcard)) # False
59
60
# Multi-level wildcard matching
61
all_sensors = Wildcard("sensors/#")
62
print(temp_topic.matches(all_sensors)) # True
63
print(humidity_topic.matches(all_sensors)) # True
64
65
# Invalid topics raise ValueError
66
try:
67
invalid_topic = Topic("sensors/+/temperature") # + not allowed in topics
68
except ValueError as e:
69
print(f"Invalid topic: {e}")
70
```
71
72
### Wildcard Pattern Matching
73
74
The Wildcard class handles MQTT wildcard subscription patterns with validation and matching logic.
75
76
```python { .api }
77
@dataclass
78
class Wildcard:
79
value: str
80
81
def __post_init__(self) -> None:
82
"""
83
Validate wildcard pattern after initialization.
84
85
Raises:
86
ValueError: If wildcard contains invalid patterns
87
"""
88
89
def __str__(self) -> str:
90
"""
91
Get string representation of wildcard.
92
93
Returns:
94
str: Wildcard pattern as string
95
"""
96
```
97
98
**MQTT Wildcard Rules:**
99
- **Single-level wildcard (`+`)**: Matches any single topic level
100
- **Multi-level wildcard (`#`)**: Matches any number of topic levels
101
- `#` must be the last character and preceded by `/` (except when it's the only character)
102
- `+` must occupy an entire topic level (between `/` characters)
103
104
**Usage examples:**
105
106
```python
107
from aiomqtt import Topic, Wildcard
108
109
# Single-level wildcard patterns
110
temp_wildcard = Wildcard("sensors/+/temperature")
111
room_wildcard = Wildcard("home/+/+")
112
113
# Multi-level wildcard patterns
114
all_sensors = Wildcard("sensors/#")
115
all_topics = Wildcard("#")
116
117
# Test topic matching
118
living_room_temp = Topic("sensors/living-room/temperature")
119
kitchen_temp = Topic("sensors/kitchen/temperature")
120
outdoor_humidity = Topic("sensors/outdoor/humidity")
121
122
# Single-level wildcard matches
123
print(living_room_temp.matches(temp_wildcard)) # True
124
print(kitchen_temp.matches(temp_wildcard)) # True
125
print(outdoor_humidity.matches(temp_wildcard)) # False
126
127
# Multi-level wildcard matches
128
print(living_room_temp.matches(all_sensors)) # True
129
print(outdoor_humidity.matches(all_sensors)) # True
130
131
# Complex pattern matching
132
home_topic = Topic("home/living-room/lights")
133
print(home_topic.matches(room_wildcard)) # True
134
135
# Wildcard validation
136
try:
137
invalid_wildcard = Wildcard("sensors/+temperature") # Invalid + usage
138
except ValueError as e:
139
print(f"Invalid wildcard: {e}")
140
141
try:
142
invalid_wildcard = Wildcard("sensors/#/temperature") # # not at end
143
except ValueError as e:
144
print(f"Invalid wildcard: {e}")
145
```
146
147
### Type Aliases for Flexible Usage
148
149
Type aliases provide flexibility in function parameters, accepting both string and object forms.
150
151
```python { .api }
152
TopicLike = str | Topic
153
WildcardLike = str | Wildcard
154
```
155
156
**Usage in functions:**
157
158
```python
159
import asyncio
160
from aiomqtt import Client, Topic, Wildcard, TopicLike, WildcardLike
161
162
def process_topic(topic: TopicLike) -> None:
163
"""Process a topic, accepting string or Topic object."""
164
if isinstance(topic, str):
165
topic_obj = Topic(topic)
166
else:
167
topic_obj = topic
168
169
print(f"Processing topic: {topic_obj}")
170
171
def check_match(topic: TopicLike, pattern: WildcardLike) -> bool:
172
"""Check if topic matches wildcard pattern."""
173
# Convert to objects if needed
174
if isinstance(topic, str):
175
topic_obj = Topic(topic)
176
else:
177
topic_obj = topic
178
179
if isinstance(pattern, str):
180
wildcard_obj = Wildcard(pattern)
181
else:
182
wildcard_obj = pattern
183
184
return topic_obj.matches(wildcard_obj)
185
186
# Usage examples
187
process_topic("sensors/temperature") # String
188
process_topic(Topic("sensors/humidity")) # Topic object
189
190
match1 = check_match("sensors/temp", "sensors/+") # Both strings
191
match2 = check_match(Topic("home/lights"), Wildcard("home/#")) # Both objects
192
print(f"Match 1: {match1}") # True
193
print(f"Match 2: {match2}") # True
194
```
195
196
### Advanced Topic Operations
197
198
Combine topic management with client operations for sophisticated subscription and publishing patterns.
199
200
**Usage examples:**
201
202
```python
203
import asyncio
204
from aiomqtt import Client, Topic, Wildcard
205
206
async def advanced_topic_operations():
207
async with Client("test.mosquitto.org") as client:
208
# Define topic hierarchy
209
base_topic = "home"
210
rooms = ["living-room", "kitchen", "bedroom"]
211
sensors = ["temperature", "humidity", "light"]
212
213
# Subscribe to all sensor data using wildcards
214
all_sensors_wildcard = Wildcard(f"{base_topic}/+/+")
215
await client.subscribe(str(all_sensors_wildcard))
216
217
# Create specific topic patterns for filtering
218
temp_wildcard = Wildcard(f"{base_topic}/+/temperature")
219
humidity_wildcard = Wildcard(f"{base_topic}/+/humidity")
220
221
# Publish test data
222
for room in rooms:
223
for sensor in sensors:
224
topic = Topic(f"{base_topic}/{room}/{sensor}")
225
await client.publish(str(topic), f"{sensor}_value")
226
227
# Process received messages with topic matching
228
message_count = 0
229
async for message in client.messages:
230
message_count += 1
231
232
if message.topic.matches(temp_wildcard):
233
room = str(message.topic).split('/')[1]
234
print(f"Temperature in {room}: {message.payload}")
235
236
elif message.topic.matches(humidity_wildcard):
237
room = str(message.topic).split('/')[1]
238
print(f"Humidity in {room}: {message.payload}")
239
240
else:
241
print(f"Other sensor data: {message.topic} = {message.payload}")
242
243
# Stop after processing all test messages
244
if message_count >= len(rooms) * len(sensors):
245
break
246
247
async def topic_filtering_example():
248
"""Example of filtering messages by topic patterns."""
249
async with Client("test.mosquitto.org") as client:
250
# Subscribe to multiple patterns
251
patterns = [
252
"sensors/+/temperature",
253
"sensors/+/humidity",
254
"alerts/#",
255
"status/+"
256
]
257
258
for pattern in patterns:
259
await client.subscribe(pattern)
260
261
# Define filter wildcards
262
critical_alerts = Wildcard("alerts/critical/#")
263
warning_alerts = Wildcard("alerts/warning/#")
264
info_alerts = Wildcard("alerts/info/#")
265
266
async for message in client.messages:
267
# Route based on topic patterns
268
if message.topic.matches(critical_alerts):
269
print(f"CRITICAL ALERT: {message.payload}")
270
elif message.topic.matches(warning_alerts):
271
print(f"Warning: {message.payload}")
272
elif message.topic.matches(info_alerts):
273
print(f"Info: {message.payload}")
274
else:
275
# Handle sensor data or status updates
276
topic_parts = str(message.topic).split('/')
277
if len(topic_parts) >= 3 and topic_parts[0] == "sensors":
278
sensor_type = topic_parts[2]
279
device_id = topic_parts[1]
280
print(f"Sensor {device_id} {sensor_type}: {message.payload}")
281
282
# Run examples
283
asyncio.run(advanced_topic_operations())
284
```
285
286
### Topic Validation Rules
287
288
Understanding MQTT topic validation rules for proper usage:
289
290
**Valid Topics:**
291
- `sensors/temperature`
292
- `home/living-room/lights/status`
293
- `$SYS/broker/uptime` (system topics starting with $)
294
- `user/data/2023/01/15`
295
296
**Invalid Topics:**
297
- `sensors/+/temperature` (+ wildcard not allowed in topics)
298
- `home/#` (# wildcard not allowed in topics)
299
- `sensors//temperature` (empty topic level)
300
- `sensors/temp+data` (+ not at level boundary)
301
302
**Valid Wildcards:**
303
- `sensors/+` (single-level wildcard)
304
- `sensors/+/temperature` (single-level in middle)
305
- `sensors/#` (multi-level at end)
306
- `#` (all topics)
307
- `sensors/+/+` (multiple single-level)
308
309
**Invalid Wildcards:**
310
- `sensors/+temp` (+ not at level boundary)
311
- `sensors/#/temperature` (# not at end)
312
- `sensors/temp#` (# not at level boundary)
313
314
## Constants
315
316
```python { .api }
317
MAX_TOPIC_LENGTH: int = 65535
318
```
319
320
Maximum allowed MQTT topic length according to the MQTT specification.