The idiomatic asyncio MQTT client
npx @tessl/cli install tessl/pypi-aiomqtt@2.4.0The idiomatic asyncio MQTT client library that eliminates callback-based programming in favor of modern Python async/await patterns. It provides a clean, Pythonic API for MQTT communication with support for MQTT versions 5.0, 3.1.1, and 3.1, featuring comprehensive type hints, graceful connection management, and seamless integration with asyncio event loops.
pip install aiomqttimport aiomqttCommon usage patterns:
from aiomqtt import Client, Message, Topic, WildcardAll public components:
from aiomqtt import (
Client,
Message,
MessagesIterator,
MqttCodeError,
MqttError,
MqttReentrantError,
ProtocolVersion,
ProxySettings,
TLSParameters,
Topic,
TopicLike,
Wildcard,
WildcardLike,
Will,
)import asyncio
from aiomqtt import Client
async def publish_example():
async with Client("test.mosquitto.org") as client:
await client.publish("temperature/outside", payload=28.4)
async def subscribe_example():
async with Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
async for message in client.messages:
print(f"Received: {message.payload} on {message.topic}")
# Run examples
asyncio.run(publish_example())
asyncio.run(subscribe_example())aiomqtt is built on the proven paho-mqtt foundation while providing an asyncio-native interface. Key architectural components:
The library eliminates callbacks entirely, using async/await patterns and structured exceptions for clean, maintainable MQTT client code.
Essential MQTT client functionality including connection management, publishing, and subscribing. The Client class serves as the main entry point for all MQTT operations.
class Client:
def __init__(
self,
hostname: str,
port: int = 1883,
*,
username: str | None = None,
password: str | None = None,
logger: logging.Logger | None = None,
identifier: str | None = None,
queue_type: type[asyncio.Queue[Message]] | None = None,
protocol: ProtocolVersion | None = None,
will: Will | None = None,
clean_session: bool | None = None,
transport: Literal["tcp", "websockets", "unix"] = "tcp",
timeout: float | None = None,
keepalive: int = 60,
bind_address: str = "",
bind_port: int = 0,
clean_start: mqtt.CleanStartOption = mqtt.MQTT_CLEAN_START_FIRST_ONLY,
max_queued_incoming_messages: int | None = None,
max_queued_outgoing_messages: int | None = None,
max_inflight_messages: int | None = None,
max_concurrent_outgoing_calls: int | None = None,
properties: Properties | None = None,
tls_context: ssl.SSLContext | None = None,
tls_params: TLSParameters | None = None,
tls_insecure: bool | None = None,
proxy: ProxySettings | None = None,
socket_options: Iterable[SocketOption] | None = None,
websocket_path: str | None = None,
websocket_headers: WebSocketHeaders | None = None,
): ...
async def __aenter__(self) -> Self: ...
async def __aexit__(self, exc_type, exc, tb) -> None: ...
async def publish(
self,
/,
topic: str,
payload: PayloadType = None,
qos: int = 0,
retain: bool = False,
properties: Properties | None = None,
*args: Any,
timeout: float | None = None,
**kwargs: Any,
) -> None: ...
async def subscribe(
self,
/,
topic: SubscribeTopic,
qos: int = 0,
options: SubscribeOptions | None = None,
properties: Properties | None = None,
*args: Any,
timeout: float | None = None,
**kwargs: Any,
) -> tuple[int, ...] | list[ReasonCode]: ...Message structure and processing capabilities for received MQTT messages, including topic matching and payload access.
class Message:
topic: Topic
payload: PayloadType
qos: int
retain: bool
mid: int
properties: Properties | None
class MessagesIterator:
def __aiter__(self) -> AsyncIterator[Message]: ...
def __anext__(self) -> Message: ...
def __len__(self) -> int: ...Topic validation, wildcard support, and matching logic for MQTT topic patterns.
@dataclass
class Topic:
value: str
def matches(self, wildcard: WildcardLike) -> bool: ...
@dataclass
class Wildcard:
value: str
TopicLike = str | Topic
WildcardLike = str | Wildcard
PayloadType = str | bytes | bytearray | int | float | None
SubscribeTopic = str | tuple[str, SubscribeOptions] | list[tuple[str, SubscribeOptions]] | list[tuple[str, int]]
WebSocketHeaders = dict[str, str] | Callable[[dict[str, str]], dict[str, str]]
SocketOption = tuple[int, int, int | bytes] | tuple[int, int, None, int]TLS/SSL configuration, authentication, protocol versions, proxy settings, and connection parameters.
# From paho.mqtt.client and paho.mqtt.properties
class Properties:
# MQTT v5.0 properties (from paho-mqtt)
pass
class ReasonCode:
# MQTT v5.0 reason codes (from paho-mqtt)
value: int
class SubscribeOptions:
# MQTT v5.0 subscription options (from paho-mqtt)
pass
# Module types and classes
class ProtocolVersion(Enum):
V31 = mqtt.MQTTv31
V311 = mqtt.MQTTv311
V5 = mqtt.MQTTv5
@dataclass
class TLSParameters:
ca_certs: str | None = None
certfile: str | None = None
keyfile: str | None = None
cert_reqs: ssl.VerifyMode | None = None
tls_version: Any | None = None
ciphers: str | None = None
keyfile_password: str | None = None
@dataclass
class ProxySettings:
def __init__(
self,
*,
proxy_type: int,
proxy_addr: str,
proxy_rdns: bool | None = True,
proxy_username: str | None = None,
proxy_password: str | None = None,
proxy_port: int | None = None,
) -> None: ...
@dataclass
class Will:
topic: str
payload: PayloadType | None = None
qos: int = 0
retain: bool = False
properties: Properties | None = NoneStructured exception hierarchy for MQTT errors, replacing callback-based error reporting with modern exception handling.
class MqttError(Exception): ...
class MqttCodeError(MqttError):
rc: int | ReasonCode | None
class MqttReentrantError(MqttError): ...__version__: str
__version_tuple__: tuple[int, int, int]