CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aiomqtt

The idiomatic asyncio MQTT client

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

message-handling.mddocs/

Message Handling

Message structure and processing capabilities for received MQTT messages, including topic matching, payload access, and message queue management.

Capabilities

Message Structure

The Message class represents an MQTT message received from the broker, providing access to all message components and metadata.

@dataclass
class Message:
    topic: Topic
    payload: PayloadType  
    qos: int
    retain: bool
    mid: int
    properties: Properties | None
    
    def __lt__(self, other: Message) -> bool:
        """
        Compare messages by message ID for ordering.

        Args:
            other (Message): Another message to compare against

        Returns:
            bool: True if this message's ID is less than the other's
        """

Message attributes:

  • topic: The MQTT topic the message was published to, as a Topic object
  • payload: The message payload, automatically decoded from bytes when possible
  • qos: Quality of service level (0, 1, or 2) used for message delivery
  • retain: Whether this message was retained on the broker
  • mid: Unique message identifier assigned by the broker
  • properties: MQTT v5.0 message properties (None for older protocol versions)

Usage example:

import asyncio
from aiomqtt import Client

async def message_inspection():
    async with Client("test.mosquitto.org") as client:
        await client.subscribe("sensors/#")
        
        async for message in client.messages:
            # Access message components
            print(f"Topic: {message.topic}")
            print(f"Payload: {message.payload}")
            print(f"QoS: {message.qos}")
            print(f"Retained: {message.retain}")
            print(f"Message ID: {message.mid}")
            
            # Handle different payload types
            if isinstance(message.payload, str):
                print(f"Text payload: {message.payload}")
            elif isinstance(message.payload, bytes):
                print(f"Binary payload: {len(message.payload)} bytes")
            elif isinstance(message.payload, (int, float)):
                print(f"Numeric payload: {message.payload}")
            elif message.payload is None:
                print("Empty payload")

asyncio.run(message_inspection())

Message Queue Iterator

The MessagesIterator provides an async iterator interface for receiving messages from the broker, with queue length inspection capabilities.

class MessagesIterator:
    def __aiter__(self) -> AsyncIterator[Message]:
        """
        Return async iterator protocol.

        Returns:
            AsyncIterator[Message]: Async iterator for messages
        """
    
    def __anext__(self) -> Message:
        """
        Get next message from the queue.

        Returns:
            Message: Next received message

        Raises:
            StopAsyncIteration: When iteration is stopped
            MqttError: If there's an error receiving messages
        """
    
    def __len__(self) -> int:
        """
        Get number of messages currently in the queue.

        Returns:
            int: Number of queued messages waiting to be processed
        """

Usage examples:

import asyncio
from aiomqtt import Client

async def queue_monitoring():
    async with Client("test.mosquitto.org") as client:
        await client.subscribe("high-volume/data/#")
        
        # Monitor queue length
        message_count = 0
        async for message in client.messages:
            message_count += 1
            queue_length = len(client.messages)
            
            print(f"Processed {message_count} messages")
            print(f"Queue length: {queue_length}")
            
            # Handle queue buildup
            if queue_length > 100:
                print("Warning: High message queue buildup")
            
            # Process message
            print(f"Received: {message.payload} on {message.topic}")

async def selective_processing():
    async with Client("test.mosquitto.org") as client:
        await client.subscribe("sensors/#")
        
        async for message in client.messages:
            # Process only specific message types
            if message.topic.matches("sensors/temperature"):
                temp_value = float(message.payload)
                print(f"Temperature: {temp_value}°C")
            elif message.topic.matches("sensors/humidity"):
                humidity_value = float(message.payload)
                print(f"Humidity: {humidity_value}%")
            else:
                print(f"Ignoring message on {message.topic}")

# Run examples
asyncio.run(queue_monitoring())

Message Filtering and Processing

Combine message iteration with topic matching for sophisticated message processing workflows.

Usage examples:

import asyncio
from aiomqtt import Client, Topic, Wildcard

async def advanced_message_processing():
    async with Client("test.mosquitto.org") as client:
        # Subscribe to multiple topic patterns
        await client.subscribe([
            ("sensors/+/temperature", 1),
            ("alerts/#", 2),
            ("status/+/online", 0)
        ])
        
        # Define topic patterns for filtering
        temp_wildcard = Wildcard("sensors/+/temperature")
        alert_wildcard = Wildcard("alerts/#")
        status_wildcard = Wildcard("status/+/online")
        
        async for message in client.messages:
            # Route messages based on topic patterns
            if message.topic.matches(temp_wildcard):
                await process_temperature(message)
            elif message.topic.matches(alert_wildcard):
                await process_alert(message)  
            elif message.topic.matches(status_wildcard):
                await process_status(message)
            else:
                print(f"Unhandled message on {message.topic}")

async def process_temperature(message):
    """Process temperature sensor messages."""
    try:
        temp = float(message.payload)
        device_id = str(message.topic).split('/')[1]
        print(f"Device {device_id} temperature: {temp}°C")
        
        if temp > 30:
            print(f"Warning: High temperature on device {device_id}")
    except (ValueError, IndexError) as e:
        print(f"Error processing temperature message: {e}")

async def process_alert(message):
    """Process alert messages with priority handling."""
    priority = "high" if message.qos == 2 else "normal"
    print(f"Alert ({priority}): {message.payload}")
    
    # Handle retained alerts
    if message.retain:
        print("This is a retained alert message")

async def process_status(message):
    """Process device status messages."""
    device_id = str(message.topic).split('/')[1]
    status = message.payload.decode() if isinstance(message.payload, bytes) else str(message.payload)
    print(f"Device {device_id} status: {status}")

# Run advanced processing
asyncio.run(advanced_message_processing())

Message Payload Handling

Handle different payload types and encodings safely.

Usage example:

import asyncio
import json
from aiomqtt import Client

async def payload_handling():
    async with Client("test.mosquitto.org") as client:
        await client.subscribe("data/#")
        
        async for message in client.messages:
            payload = message.payload
            
            # Handle different payload types
            if payload is None:
                print(f"Empty message on {message.topic}")
                
            elif isinstance(payload, str):
                # String payload - could be JSON, plain text, etc.
                if payload.startswith('{') or payload.startswith('['):
                    try:
                        data = json.loads(payload)
                        print(f"JSON data: {data}")
                    except json.JSONDecodeError:
                        print(f"Text payload: {payload}")
                else:
                    print(f"Text payload: {payload}")
                    
            elif isinstance(payload, bytes):
                # Binary payload - attempt UTF-8 decode
                try:
                    text = payload.decode('utf-8')
                    print(f"Decoded text: {text}")
                except UnicodeDecodeError:
                    print(f"Binary data: {len(payload)} bytes")
                    
            elif isinstance(payload, (int, float)):
                # Numeric payload
                print(f"Numeric value: {payload}")
                
            else:
                print(f"Unknown payload type: {type(payload)}")

asyncio.run(payload_handling())

Type Definitions

PayloadType = str | bytes | bytearray | int | float | None

Install with Tessl CLI

npx tessl i tessl/pypi-aiomqtt

docs

configuration-security.md

core-client.md

error-handling.md

index.md

message-handling.md

topic-management.md

tile.json