The idiomatic asyncio MQTT client
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Topic validation, wildcard support, and matching logic for MQTT topic patterns. Provides type-safe topic handling with validation and matching capabilities.
The Topic class provides validated MQTT topic handling with automatic validation and string conversion.
@dataclass
class Topic:
value: str
def __post_init__(self) -> None:
"""
Validate topic after initialization.
Raises:
ValueError: If topic contains invalid characters or patterns
"""
def __str__(self) -> str:
"""
Get string representation of topic.
Returns:
str: Topic as string
"""
def matches(self, wildcard: WildcardLike) -> bool:
"""
Check if this topic matches a wildcard pattern.
Args:
wildcard (WildcardLike): Wildcard pattern to match against
Returns:
bool: True if topic matches the wildcard pattern
"""Usage examples:
from aiomqtt import Topic, Wildcard
# Create and validate topics
temp_topic = Topic("sensors/living-room/temperature")
humidity_topic = Topic("sensors/kitchen/humidity")
# String conversion
print(str(temp_topic)) # "sensors/living-room/temperature"
# Topic matching with wildcards
wildcard = Wildcard("sensors/+/temperature")
print(temp_topic.matches(wildcard)) # True
print(humidity_topic.matches(wildcard)) # False
# Multi-level wildcard matching
all_sensors = Wildcard("sensors/#")
print(temp_topic.matches(all_sensors)) # True
print(humidity_topic.matches(all_sensors)) # True
# Invalid topics raise ValueError
try:
invalid_topic = Topic("sensors/+/temperature") # + not allowed in topics
except ValueError as e:
print(f"Invalid topic: {e}")The Wildcard class handles MQTT wildcard subscription patterns with validation and matching logic.
@dataclass
class Wildcard:
value: str
def __post_init__(self) -> None:
"""
Validate wildcard pattern after initialization.
Raises:
ValueError: If wildcard contains invalid patterns
"""
def __str__(self) -> str:
"""
Get string representation of wildcard.
Returns:
str: Wildcard pattern as string
"""MQTT Wildcard Rules:
+): Matches any single topic level#): Matches any number of topic levels# must be the last character and preceded by / (except when it's the only character)+ must occupy an entire topic level (between / characters)Usage examples:
from aiomqtt import Topic, Wildcard
# Single-level wildcard patterns
temp_wildcard = Wildcard("sensors/+/temperature")
room_wildcard = Wildcard("home/+/+")
# Multi-level wildcard patterns
all_sensors = Wildcard("sensors/#")
all_topics = Wildcard("#")
# Test topic matching
living_room_temp = Topic("sensors/living-room/temperature")
kitchen_temp = Topic("sensors/kitchen/temperature")
outdoor_humidity = Topic("sensors/outdoor/humidity")
# Single-level wildcard matches
print(living_room_temp.matches(temp_wildcard)) # True
print(kitchen_temp.matches(temp_wildcard)) # True
print(outdoor_humidity.matches(temp_wildcard)) # False
# Multi-level wildcard matches
print(living_room_temp.matches(all_sensors)) # True
print(outdoor_humidity.matches(all_sensors)) # True
# Complex pattern matching
home_topic = Topic("home/living-room/lights")
print(home_topic.matches(room_wildcard)) # True
# Wildcard validation
try:
invalid_wildcard = Wildcard("sensors/+temperature") # Invalid + usage
except ValueError as e:
print(f"Invalid wildcard: {e}")
try:
invalid_wildcard = Wildcard("sensors/#/temperature") # # not at end
except ValueError as e:
print(f"Invalid wildcard: {e}")Type aliases provide flexibility in function parameters, accepting both string and object forms.
TopicLike = str | Topic
WildcardLike = str | WildcardUsage in functions:
import asyncio
from aiomqtt import Client, Topic, Wildcard, TopicLike, WildcardLike
def process_topic(topic: TopicLike) -> None:
"""Process a topic, accepting string or Topic object."""
if isinstance(topic, str):
topic_obj = Topic(topic)
else:
topic_obj = topic
print(f"Processing topic: {topic_obj}")
def check_match(topic: TopicLike, pattern: WildcardLike) -> bool:
"""Check if topic matches wildcard pattern."""
# Convert to objects if needed
if isinstance(topic, str):
topic_obj = Topic(topic)
else:
topic_obj = topic
if isinstance(pattern, str):
wildcard_obj = Wildcard(pattern)
else:
wildcard_obj = pattern
return topic_obj.matches(wildcard_obj)
# Usage examples
process_topic("sensors/temperature") # String
process_topic(Topic("sensors/humidity")) # Topic object
match1 = check_match("sensors/temp", "sensors/+") # Both strings
match2 = check_match(Topic("home/lights"), Wildcard("home/#")) # Both objects
print(f"Match 1: {match1}") # True
print(f"Match 2: {match2}") # TrueCombine topic management with client operations for sophisticated subscription and publishing patterns.
Usage examples:
import asyncio
from aiomqtt import Client, Topic, Wildcard
async def advanced_topic_operations():
async with Client("test.mosquitto.org") as client:
# Define topic hierarchy
base_topic = "home"
rooms = ["living-room", "kitchen", "bedroom"]
sensors = ["temperature", "humidity", "light"]
# Subscribe to all sensor data using wildcards
all_sensors_wildcard = Wildcard(f"{base_topic}/+/+")
await client.subscribe(str(all_sensors_wildcard))
# Create specific topic patterns for filtering
temp_wildcard = Wildcard(f"{base_topic}/+/temperature")
humidity_wildcard = Wildcard(f"{base_topic}/+/humidity")
# Publish test data
for room in rooms:
for sensor in sensors:
topic = Topic(f"{base_topic}/{room}/{sensor}")
await client.publish(str(topic), f"{sensor}_value")
# Process received messages with topic matching
message_count = 0
async for message in client.messages:
message_count += 1
if message.topic.matches(temp_wildcard):
room = str(message.topic).split('/')[1]
print(f"Temperature in {room}: {message.payload}")
elif message.topic.matches(humidity_wildcard):
room = str(message.topic).split('/')[1]
print(f"Humidity in {room}: {message.payload}")
else:
print(f"Other sensor data: {message.topic} = {message.payload}")
# Stop after processing all test messages
if message_count >= len(rooms) * len(sensors):
break
async def topic_filtering_example():
"""Example of filtering messages by topic patterns."""
async with Client("test.mosquitto.org") as client:
# Subscribe to multiple patterns
patterns = [
"sensors/+/temperature",
"sensors/+/humidity",
"alerts/#",
"status/+"
]
for pattern in patterns:
await client.subscribe(pattern)
# Define filter wildcards
critical_alerts = Wildcard("alerts/critical/#")
warning_alerts = Wildcard("alerts/warning/#")
info_alerts = Wildcard("alerts/info/#")
async for message in client.messages:
# Route based on topic patterns
if message.topic.matches(critical_alerts):
print(f"CRITICAL ALERT: {message.payload}")
elif message.topic.matches(warning_alerts):
print(f"Warning: {message.payload}")
elif message.topic.matches(info_alerts):
print(f"Info: {message.payload}")
else:
# Handle sensor data or status updates
topic_parts = str(message.topic).split('/')
if len(topic_parts) >= 3 and topic_parts[0] == "sensors":
sensor_type = topic_parts[2]
device_id = topic_parts[1]
print(f"Sensor {device_id} {sensor_type}: {message.payload}")
# Run examples
asyncio.run(advanced_topic_operations())Understanding MQTT topic validation rules for proper usage:
Valid Topics:
sensors/temperaturehome/living-room/lights/status$SYS/broker/uptime (system topics starting with $)user/data/2023/01/15Invalid Topics:
sensors/+/temperature (+ wildcard not allowed in topics)home/# (# wildcard not allowed in topics)sensors//temperature (empty topic level)sensors/temp+data (+ not at level boundary)Valid Wildcards:
sensors/+ (single-level wildcard)sensors/+/temperature (single-level in middle)sensors/# (multi-level at end)# (all topics)sensors/+/+ (multiple single-level)Invalid Wildcards:
sensors/+temp (+ not at level boundary)sensors/#/temperature (# not at end)sensors/temp# (# not at level boundary)MAX_TOPIC_LENGTH: int = 65535Maximum allowed MQTT topic length according to the MQTT specification.
Install with Tessl CLI
npx tessl i tessl/pypi-aiomqtt