The idiomatic asyncio MQTT client
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Message structure and processing capabilities for received MQTT messages, including topic matching, payload access, and message queue management.
The Message class represents an MQTT message received from the broker, providing access to all message components and metadata.
@dataclass
class Message:
topic: Topic
payload: PayloadType
qos: int
retain: bool
mid: int
properties: Properties | None
def __lt__(self, other: Message) -> bool:
"""
Compare messages by message ID for ordering.
Args:
other (Message): Another message to compare against
Returns:
bool: True if this message's ID is less than the other's
"""Message attributes:
Usage example:
import asyncio
from aiomqtt import Client
async def message_inspection():
async with Client("test.mosquitto.org") as client:
await client.subscribe("sensors/#")
async for message in client.messages:
# Access message components
print(f"Topic: {message.topic}")
print(f"Payload: {message.payload}")
print(f"QoS: {message.qos}")
print(f"Retained: {message.retain}")
print(f"Message ID: {message.mid}")
# Handle different payload types
if isinstance(message.payload, str):
print(f"Text payload: {message.payload}")
elif isinstance(message.payload, bytes):
print(f"Binary payload: {len(message.payload)} bytes")
elif isinstance(message.payload, (int, float)):
print(f"Numeric payload: {message.payload}")
elif message.payload is None:
print("Empty payload")
asyncio.run(message_inspection())The MessagesIterator provides an async iterator interface for receiving messages from the broker, with queue length inspection capabilities.
class MessagesIterator:
def __aiter__(self) -> AsyncIterator[Message]:
"""
Return async iterator protocol.
Returns:
AsyncIterator[Message]: Async iterator for messages
"""
def __anext__(self) -> Message:
"""
Get next message from the queue.
Returns:
Message: Next received message
Raises:
StopAsyncIteration: When iteration is stopped
MqttError: If there's an error receiving messages
"""
def __len__(self) -> int:
"""
Get number of messages currently in the queue.
Returns:
int: Number of queued messages waiting to be processed
"""Usage examples:
import asyncio
from aiomqtt import Client
async def queue_monitoring():
async with Client("test.mosquitto.org") as client:
await client.subscribe("high-volume/data/#")
# Monitor queue length
message_count = 0
async for message in client.messages:
message_count += 1
queue_length = len(client.messages)
print(f"Processed {message_count} messages")
print(f"Queue length: {queue_length}")
# Handle queue buildup
if queue_length > 100:
print("Warning: High message queue buildup")
# Process message
print(f"Received: {message.payload} on {message.topic}")
async def selective_processing():
async with Client("test.mosquitto.org") as client:
await client.subscribe("sensors/#")
async for message in client.messages:
# Process only specific message types
if message.topic.matches("sensors/temperature"):
temp_value = float(message.payload)
print(f"Temperature: {temp_value}°C")
elif message.topic.matches("sensors/humidity"):
humidity_value = float(message.payload)
print(f"Humidity: {humidity_value}%")
else:
print(f"Ignoring message on {message.topic}")
# Run examples
asyncio.run(queue_monitoring())Combine message iteration with topic matching for sophisticated message processing workflows.
Usage examples:
import asyncio
from aiomqtt import Client, Topic, Wildcard
async def advanced_message_processing():
async with Client("test.mosquitto.org") as client:
# Subscribe to multiple topic patterns
await client.subscribe([
("sensors/+/temperature", 1),
("alerts/#", 2),
("status/+/online", 0)
])
# Define topic patterns for filtering
temp_wildcard = Wildcard("sensors/+/temperature")
alert_wildcard = Wildcard("alerts/#")
status_wildcard = Wildcard("status/+/online")
async for message in client.messages:
# Route messages based on topic patterns
if message.topic.matches(temp_wildcard):
await process_temperature(message)
elif message.topic.matches(alert_wildcard):
await process_alert(message)
elif message.topic.matches(status_wildcard):
await process_status(message)
else:
print(f"Unhandled message on {message.topic}")
async def process_temperature(message):
"""Process temperature sensor messages."""
try:
temp = float(message.payload)
device_id = str(message.topic).split('/')[1]
print(f"Device {device_id} temperature: {temp}°C")
if temp > 30:
print(f"Warning: High temperature on device {device_id}")
except (ValueError, IndexError) as e:
print(f"Error processing temperature message: {e}")
async def process_alert(message):
"""Process alert messages with priority handling."""
priority = "high" if message.qos == 2 else "normal"
print(f"Alert ({priority}): {message.payload}")
# Handle retained alerts
if message.retain:
print("This is a retained alert message")
async def process_status(message):
"""Process device status messages."""
device_id = str(message.topic).split('/')[1]
status = message.payload.decode() if isinstance(message.payload, bytes) else str(message.payload)
print(f"Device {device_id} status: {status}")
# Run advanced processing
asyncio.run(advanced_message_processing())Handle different payload types and encodings safely.
Usage example:
import asyncio
import json
from aiomqtt import Client
async def payload_handling():
async with Client("test.mosquitto.org") as client:
await client.subscribe("data/#")
async for message in client.messages:
payload = message.payload
# Handle different payload types
if payload is None:
print(f"Empty message on {message.topic}")
elif isinstance(payload, str):
# String payload - could be JSON, plain text, etc.
if payload.startswith('{') or payload.startswith('['):
try:
data = json.loads(payload)
print(f"JSON data: {data}")
except json.JSONDecodeError:
print(f"Text payload: {payload}")
else:
print(f"Text payload: {payload}")
elif isinstance(payload, bytes):
# Binary payload - attempt UTF-8 decode
try:
text = payload.decode('utf-8')
print(f"Decoded text: {text}")
except UnicodeDecodeError:
print(f"Binary data: {len(payload)} bytes")
elif isinstance(payload, (int, float)):
# Numeric payload
print(f"Numeric value: {payload}")
else:
print(f"Unknown payload type: {type(payload)}")
asyncio.run(payload_handling())PayloadType = str | bytes | bytearray | int | float | NoneInstall with Tessl CLI
npx tessl i tessl/pypi-aiomqtt