CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aiomqtt

The idiomatic asyncio MQTT client

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

topic-management.mddocs/

Topic Management

Topic validation, wildcard support, and matching logic for MQTT topic patterns. Provides type-safe topic handling with validation and matching capabilities.

Capabilities

Topic Validation and Representation

The Topic class provides validated MQTT topic handling with automatic validation and string conversion.

@dataclass
class Topic:
    value: str
    
    def __post_init__(self) -> None:
        """
        Validate topic after initialization.
        
        Raises:
            ValueError: If topic contains invalid characters or patterns
        """
    
    def __str__(self) -> str:
        """
        Get string representation of topic.
        
        Returns:
            str: Topic as string
        """
    
    def matches(self, wildcard: WildcardLike) -> bool:
        """
        Check if this topic matches a wildcard pattern.
        
        Args:
            wildcard (WildcardLike): Wildcard pattern to match against
            
        Returns:
            bool: True if topic matches the wildcard pattern
        """

Usage examples:

from aiomqtt import Topic, Wildcard

# Create and validate topics
temp_topic = Topic("sensors/living-room/temperature")
humidity_topic = Topic("sensors/kitchen/humidity")

# String conversion
print(str(temp_topic))  # "sensors/living-room/temperature"

# Topic matching with wildcards
wildcard = Wildcard("sensors/+/temperature")
print(temp_topic.matches(wildcard))    # True
print(humidity_topic.matches(wildcard)) # False

# Multi-level wildcard matching
all_sensors = Wildcard("sensors/#")
print(temp_topic.matches(all_sensors))    # True
print(humidity_topic.matches(all_sensors)) # True

# Invalid topics raise ValueError
try:
    invalid_topic = Topic("sensors/+/temperature")  # + not allowed in topics
except ValueError as e:
    print(f"Invalid topic: {e}")

Wildcard Pattern Matching

The Wildcard class handles MQTT wildcard subscription patterns with validation and matching logic.

@dataclass
class Wildcard:
    value: str
    
    def __post_init__(self) -> None:
        """
        Validate wildcard pattern after initialization.
        
        Raises:
            ValueError: If wildcard contains invalid patterns
        """
    
    def __str__(self) -> str:
        """
        Get string representation of wildcard.
        
        Returns:
            str: Wildcard pattern as string
        """

MQTT Wildcard Rules:

  • Single-level wildcard (+): Matches any single topic level
  • Multi-level wildcard (#): Matches any number of topic levels
  • # must be the last character and preceded by / (except when it's the only character)
  • + must occupy an entire topic level (between / characters)

Usage examples:

from aiomqtt import Topic, Wildcard

# Single-level wildcard patterns
temp_wildcard = Wildcard("sensors/+/temperature")
room_wildcard = Wildcard("home/+/+")

# Multi-level wildcard patterns
all_sensors = Wildcard("sensors/#")
all_topics = Wildcard("#")

# Test topic matching
living_room_temp = Topic("sensors/living-room/temperature")
kitchen_temp = Topic("sensors/kitchen/temperature")
outdoor_humidity = Topic("sensors/outdoor/humidity")

# Single-level wildcard matches
print(living_room_temp.matches(temp_wildcard))  # True
print(kitchen_temp.matches(temp_wildcard))      # True
print(outdoor_humidity.matches(temp_wildcard))  # False

# Multi-level wildcard matches
print(living_room_temp.matches(all_sensors))    # True
print(outdoor_humidity.matches(all_sensors))    # True

# Complex pattern matching
home_topic = Topic("home/living-room/lights")
print(home_topic.matches(room_wildcard))        # True

# Wildcard validation
try:
    invalid_wildcard = Wildcard("sensors/+temperature")  # Invalid + usage
except ValueError as e:
    print(f"Invalid wildcard: {e}")

try:
    invalid_wildcard = Wildcard("sensors/#/temperature")  # # not at end
except ValueError as e:
    print(f"Invalid wildcard: {e}")

Type Aliases for Flexible Usage

Type aliases provide flexibility in function parameters, accepting both string and object forms.

TopicLike = str | Topic
WildcardLike = str | Wildcard

Usage in functions:

import asyncio
from aiomqtt import Client, Topic, Wildcard, TopicLike, WildcardLike

def process_topic(topic: TopicLike) -> None:
    """Process a topic, accepting string or Topic object."""
    if isinstance(topic, str):
        topic_obj = Topic(topic)
    else:
        topic_obj = topic
    
    print(f"Processing topic: {topic_obj}")

def check_match(topic: TopicLike, pattern: WildcardLike) -> bool:
    """Check if topic matches wildcard pattern."""
    # Convert to objects if needed
    if isinstance(topic, str):
        topic_obj = Topic(topic)
    else:
        topic_obj = topic
        
    if isinstance(pattern, str):
        wildcard_obj = Wildcard(pattern)
    else:
        wildcard_obj = pattern
    
    return topic_obj.matches(wildcard_obj)

# Usage examples
process_topic("sensors/temperature")           # String
process_topic(Topic("sensors/humidity"))      # Topic object

match1 = check_match("sensors/temp", "sensors/+")      # Both strings
match2 = check_match(Topic("home/lights"), Wildcard("home/#"))  # Both objects
print(f"Match 1: {match1}")  # True
print(f"Match 2: {match2}")  # True

Advanced Topic Operations

Combine topic management with client operations for sophisticated subscription and publishing patterns.

Usage examples:

import asyncio
from aiomqtt import Client, Topic, Wildcard

async def advanced_topic_operations():
    async with Client("test.mosquitto.org") as client:
        # Define topic hierarchy
        base_topic = "home"
        rooms = ["living-room", "kitchen", "bedroom"]
        sensors = ["temperature", "humidity", "light"]
        
        # Subscribe to all sensor data using wildcards
        all_sensors_wildcard = Wildcard(f"{base_topic}/+/+")
        await client.subscribe(str(all_sensors_wildcard))
        
        # Create specific topic patterns for filtering
        temp_wildcard = Wildcard(f"{base_topic}/+/temperature")
        humidity_wildcard = Wildcard(f"{base_topic}/+/humidity")
        
        # Publish test data
        for room in rooms:
            for sensor in sensors:
                topic = Topic(f"{base_topic}/{room}/{sensor}")
                await client.publish(str(topic), f"{sensor}_value")
        
        # Process received messages with topic matching
        message_count = 0
        async for message in client.messages:
            message_count += 1
            
            if message.topic.matches(temp_wildcard):
                room = str(message.topic).split('/')[1]
                print(f"Temperature in {room}: {message.payload}")
                
            elif message.topic.matches(humidity_wildcard):
                room = str(message.topic).split('/')[1]
                print(f"Humidity in {room}: {message.payload}")
                
            else:
                print(f"Other sensor data: {message.topic} = {message.payload}")
            
            # Stop after processing all test messages
            if message_count >= len(rooms) * len(sensors):
                break

async def topic_filtering_example():
    """Example of filtering messages by topic patterns."""
    async with Client("test.mosquitto.org") as client:
        # Subscribe to multiple patterns
        patterns = [
            "sensors/+/temperature",
            "sensors/+/humidity", 
            "alerts/#",
            "status/+"
        ]
        
        for pattern in patterns:
            await client.subscribe(pattern)
        
        # Define filter wildcards
        critical_alerts = Wildcard("alerts/critical/#")
        warning_alerts = Wildcard("alerts/warning/#")
        info_alerts = Wildcard("alerts/info/#")
        
        async for message in client.messages:
            # Route based on topic patterns
            if message.topic.matches(critical_alerts):
                print(f"CRITICAL ALERT: {message.payload}")
            elif message.topic.matches(warning_alerts):
                print(f"Warning: {message.payload}")
            elif message.topic.matches(info_alerts):
                print(f"Info: {message.payload}")
            else:
                # Handle sensor data or status updates
                topic_parts = str(message.topic).split('/')
                if len(topic_parts) >= 3 and topic_parts[0] == "sensors":
                    sensor_type = topic_parts[2]
                    device_id = topic_parts[1]
                    print(f"Sensor {device_id} {sensor_type}: {message.payload}")

# Run examples
asyncio.run(advanced_topic_operations())

Topic Validation Rules

Understanding MQTT topic validation rules for proper usage:

Valid Topics:

  • sensors/temperature
  • home/living-room/lights/status
  • $SYS/broker/uptime (system topics starting with $)
  • user/data/2023/01/15

Invalid Topics:

  • sensors/+/temperature (+ wildcard not allowed in topics)
  • home/# (# wildcard not allowed in topics)
  • sensors//temperature (empty topic level)
  • sensors/temp+data (+ not at level boundary)

Valid Wildcards:

  • sensors/+ (single-level wildcard)
  • sensors/+/temperature (single-level in middle)
  • sensors/# (multi-level at end)
  • # (all topics)
  • sensors/+/+ (multiple single-level)

Invalid Wildcards:

  • sensors/+temp (+ not at level boundary)
  • sensors/#/temperature (# not at end)
  • sensors/temp# (# not at level boundary)

Constants

MAX_TOPIC_LENGTH: int = 65535

Maximum allowed MQTT topic length according to the MQTT specification.

Install with Tessl CLI

npx tessl i tessl/pypi-aiomqtt

docs

configuration-security.md

core-client.md

error-handling.md

index.md

message-handling.md

topic-management.md

tile.json