or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration-security.mdcore-client.mderror-handling.mdindex.mdmessage-handling.mdtopic-management.md
tile.json

tessl/pypi-aiomqtt

The idiomatic asyncio MQTT client

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/aiomqtt@2.4.x

To install, run

npx @tessl/cli install tessl/pypi-aiomqtt@2.4.0

index.mddocs/

aiomqtt

The idiomatic asyncio MQTT client library that eliminates callback-based programming in favor of modern Python async/await patterns. It provides a clean, Pythonic API for MQTT communication with support for MQTT versions 5.0, 3.1.1, and 3.1, featuring comprehensive type hints, graceful connection management, and seamless integration with asyncio event loops.

Package Information

  • Package Name: aiomqtt
  • Package Type: pypi
  • Language: Python
  • Installation: pip install aiomqtt

Core Imports

import aiomqtt

Common usage patterns:

from aiomqtt import Client, Message, Topic, Wildcard

All public components:

from aiomqtt import (
    Client,
    Message,
    MessagesIterator,
    MqttCodeError,
    MqttError,
    MqttReentrantError,
    ProtocolVersion,
    ProxySettings,
    TLSParameters,
    Topic,
    TopicLike,
    Wildcard,
    WildcardLike,
    Will,
)

Basic Usage

import asyncio
from aiomqtt import Client

async def publish_example():
    async with Client("test.mosquitto.org") as client:
        await client.publish("temperature/outside", payload=28.4)

async def subscribe_example():
    async with Client("test.mosquitto.org") as client:
        await client.subscribe("temperature/#")
        async for message in client.messages:
            print(f"Received: {message.payload} on {message.topic}")

# Run examples
asyncio.run(publish_example())
asyncio.run(subscribe_example())

Architecture

aiomqtt is built on the proven paho-mqtt foundation while providing an asyncio-native interface. Key architectural components:

  • Client: Async context manager handling broker connections and operations
  • MessagesIterator: Dynamic view of the client's message queue for async iteration
  • Message: Immutable message wrapper with topic matching capabilities
  • Topic/Wildcard: Validation and matching logic for MQTT topic patterns
  • Error Hierarchy: Structured exception handling replacing callback-based error reporting

The library eliminates callbacks entirely, using async/await patterns and structured exceptions for clean, maintainable MQTT client code.

Capabilities

Core Client Operations

Essential MQTT client functionality including connection management, publishing, and subscribing. The Client class serves as the main entry point for all MQTT operations.

class Client:
    def __init__(
        self,
        hostname: str,
        port: int = 1883,
        *,
        username: str | None = None,
        password: str | None = None,
        logger: logging.Logger | None = None,
        identifier: str | None = None,
        queue_type: type[asyncio.Queue[Message]] | None = None,
        protocol: ProtocolVersion | None = None,
        will: Will | None = None,
        clean_session: bool | None = None,
        transport: Literal["tcp", "websockets", "unix"] = "tcp",
        timeout: float | None = None,
        keepalive: int = 60,
        bind_address: str = "",
        bind_port: int = 0,
        clean_start: mqtt.CleanStartOption = mqtt.MQTT_CLEAN_START_FIRST_ONLY,
        max_queued_incoming_messages: int | None = None,
        max_queued_outgoing_messages: int | None = None,
        max_inflight_messages: int | None = None,
        max_concurrent_outgoing_calls: int | None = None,
        properties: Properties | None = None,
        tls_context: ssl.SSLContext | None = None,
        tls_params: TLSParameters | None = None,
        tls_insecure: bool | None = None,
        proxy: ProxySettings | None = None,
        socket_options: Iterable[SocketOption] | None = None,
        websocket_path: str | None = None,
        websocket_headers: WebSocketHeaders | None = None,
    ): ...
    
    async def __aenter__(self) -> Self: ...
    async def __aexit__(self, exc_type, exc, tb) -> None: ...
    async def publish(
        self,
        /,
        topic: str,
        payload: PayloadType = None,
        qos: int = 0,
        retain: bool = False,
        properties: Properties | None = None,
        *args: Any,
        timeout: float | None = None,
        **kwargs: Any,
    ) -> None: ...
    async def subscribe(
        self,
        /,
        topic: SubscribeTopic,
        qos: int = 0,
        options: SubscribeOptions | None = None,
        properties: Properties | None = None,
        *args: Any,
        timeout: float | None = None,
        **kwargs: Any,
    ) -> tuple[int, ...] | list[ReasonCode]: ...

Core Client Operations

Message Handling

Message structure and processing capabilities for received MQTT messages, including topic matching and payload access.

class Message:
    topic: Topic
    payload: PayloadType
    qos: int
    retain: bool
    mid: int
    properties: Properties | None

class MessagesIterator:
    def __aiter__(self) -> AsyncIterator[Message]: ...
    def __anext__(self) -> Message: ...
    def __len__(self) -> int: ...

Message Handling

Topic Management

Topic validation, wildcard support, and matching logic for MQTT topic patterns.

@dataclass
class Topic:
    value: str
    def matches(self, wildcard: WildcardLike) -> bool: ...

@dataclass  
class Wildcard:
    value: str

TopicLike = str | Topic
WildcardLike = str | Wildcard

PayloadType = str | bytes | bytearray | int | float | None
SubscribeTopic = str | tuple[str, SubscribeOptions] | list[tuple[str, SubscribeOptions]] | list[tuple[str, int]]
WebSocketHeaders = dict[str, str] | Callable[[dict[str, str]], dict[str, str]]
SocketOption = tuple[int, int, int | bytes] | tuple[int, int, None, int]

Topic Management

Configuration & Security

TLS/SSL configuration, authentication, protocol versions, proxy settings, and connection parameters.

# From paho.mqtt.client and paho.mqtt.properties
class Properties:
    # MQTT v5.0 properties (from paho-mqtt)
    pass

class ReasonCode:
    # MQTT v5.0 reason codes (from paho-mqtt)
    value: int

class SubscribeOptions:
    # MQTT v5.0 subscription options (from paho-mqtt)
    pass

# Module types and classes
class ProtocolVersion(Enum):
    V31 = mqtt.MQTTv31
    V311 = mqtt.MQTTv311  
    V5 = mqtt.MQTTv5

@dataclass
class TLSParameters:
    ca_certs: str | None = None
    certfile: str | None = None
    keyfile: str | None = None
    cert_reqs: ssl.VerifyMode | None = None
    tls_version: Any | None = None
    ciphers: str | None = None
    keyfile_password: str | None = None

@dataclass
class ProxySettings:
    def __init__(
        self,
        *,
        proxy_type: int,
        proxy_addr: str,
        proxy_rdns: bool | None = True,
        proxy_username: str | None = None,
        proxy_password: str | None = None,
        proxy_port: int | None = None,
    ) -> None: ...

@dataclass
class Will:
    topic: str
    payload: PayloadType | None = None
    qos: int = 0
    retain: bool = False
    properties: Properties | None = None

Configuration & Security

Error Handling

Structured exception hierarchy for MQTT errors, replacing callback-based error reporting with modern exception handling.

class MqttError(Exception): ...

class MqttCodeError(MqttError):
    rc: int | ReasonCode | None


class MqttReentrantError(MqttError): ...

Error Handling

Version Information

__version__: str
__version_tuple__: tuple[int, int, int]