CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aiomqtt

The idiomatic asyncio MQTT client

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

core-client.mddocs/

Core Client Operations

Essential MQTT client functionality including connection management, publishing, and subscribing. The Client class serves as the main entry point for all MQTT operations using async context manager patterns.

Capabilities

Client Connection Management

The Client class provides async context manager support for automatic connection and disconnection management, eliminating the need for manual connection lifecycle handling.

class Client:
    def __init__(
        self,
        hostname: str,
        port: int = 1883,
        *,
        username: str | None = None,
        password: str | None = None,
        logger: logging.Logger | None = None,
        identifier: str | None = None,
        queue_type: type[asyncio.Queue[Message]] | None = None,
        protocol: ProtocolVersion | None = None,
        will: Will | None = None,
        clean_session: bool | None = None,
        transport: Literal["tcp", "websockets", "unix"] = "tcp",
        timeout: float | None = None,
        keepalive: int = 60,
        bind_address: str = "",
        bind_port: int = 0,
        clean_start: mqtt.CleanStartOption = mqtt.MQTT_CLEAN_START_FIRST_ONLY,
        max_queued_incoming_messages: int | None = None,
        max_queued_outgoing_messages: int | None = None,
        max_inflight_messages: int | None = None,
        max_concurrent_outgoing_calls: int | None = None,
        properties: Properties | None = None,
        tls_context: ssl.SSLContext | None = None,
        tls_params: TLSParameters | None = None,
        tls_insecure: bool | None = None,
        proxy: ProxySettings | None = None,
        socket_options: Iterable[SocketOption] | None = None,
        websocket_path: str | None = None,
        websocket_headers: WebSocketHeaders | None = None,
    ):
        """
        Initialize MQTT client with connection parameters.

        Args:
            hostname (str): MQTT broker hostname or IP address
            port (int): MQTT broker port, defaults to 1883
            username (str, optional): Authentication username
            password (str, optional): Authentication password  
            logger (logging.Logger, optional): Custom logger instance
            identifier (str, optional): Client ID, auto-generated if None
            queue_type (type, optional): Custom message queue class
            protocol (ProtocolVersion, optional): MQTT protocol version
            will (Will, optional): Last will and testament message
            clean_session (bool, optional): Clean session flag for MQTT v3.1.1
            transport (str): Transport protocol - "tcp", "websockets", or "unix"
            timeout (float, optional): Default timeout for operations
            keepalive (int): Keep-alive interval in seconds
            bind_address (str): Local interface to bind to
            bind_port (int): Local port to bind to
            clean_start (mqtt.CleanStartOption): MQTT v5.0 clean start option
            max_queued_incoming_messages (int, optional): Incoming message queue limit
            max_queued_outgoing_messages (int, optional): Outgoing message queue limit
            max_inflight_messages (int, optional): Maximum inflight messages
            max_concurrent_outgoing_calls (int, optional): Concurrency limit
            properties (Properties, optional): MQTT v5.0 connection properties
            tls_context (ssl.SSLContext, optional): Pre-configured SSL context
            tls_params (TLSParameters, optional): SSL/TLS configuration parameters
            tls_insecure (bool, optional): Disable hostname verification
            proxy (ProxySettings, optional): Proxy configuration
            socket_options (Iterable, optional): Socket options
            websocket_path (str, optional): WebSocket path for websocket transport
            websocket_headers (WebSocketHeaders, optional): WebSocket headers
        """

    async def __aenter__(self) -> Self:
        """
        Connect to MQTT broker when entering async context.

        Returns:
            Self: The connected client instance

        Raises:
            MqttError: If connection fails
        """

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        tb: TracebackType | None,
    ) -> None:
        """
        Disconnect from MQTT broker when exiting async context.

        Args:
            exc_type: Exception type if context exited with exception
            exc: Exception instance if context exited with exception  
            tb: Traceback if context exited with exception
        """

Usage example:

import asyncio
from aiomqtt import Client

async def basic_connection():
    # Automatic connection and disconnection
    async with Client("test.mosquitto.org") as client:
        print(f"Connected with client ID: {client.identifier}")
        # Client automatically disconnects when exiting context

asyncio.run(basic_connection())

Message Publishing

Publish messages to MQTT topics with support for quality of service, retained messages, and MQTT v5.0 properties.

async def publish(
    self,
    /,
    topic: str,
    payload: PayloadType = None,
    qos: int = 0,
    retain: bool = False,
    properties: Properties | None = None,
    *args: Any,
    timeout: float | None = None,
    **kwargs: Any,
) -> None:
    """
    Publish a message to an MQTT topic.

    Args:
        topic (str): Target topic for the message
        payload (PayloadType, optional): Message payload, defaults to None
        qos (int): Quality of service level (0, 1, or 2), defaults to 0
        retain (bool): Whether to retain the message on the broker, defaults to False
        properties (Properties, optional): MQTT v5.0 message properties
        timeout (float, optional): Operation timeout, uses client default if None

    Raises:
        MqttError: If publish operation fails
        MqttCodeError: If broker returns an error code
    """

Usage examples:

import asyncio
from aiomqtt import Client

async def publish_examples():
    async with Client("test.mosquitto.org") as client:
        # Simple text message
        await client.publish("sensors/temperature", "23.5")
        
        # Binary payload
        await client.publish("sensors/image", b"binary_image_data")
        
        # Numeric payload  
        await client.publish("sensors/humidity", 65.2)
        
        # QoS 1 with retain flag
        await client.publish(
            "status/online", 
            "connected", 
            qos=1, 
            retain=True
        )
        
        # With custom timeout
        await client.publish(
            "slow/topic", 
            "data", 
            timeout=10.0
        )

asyncio.run(publish_examples())

Topic Subscription

Subscribe to MQTT topics and wildcards with support for multiple QoS levels and subscription options.

async def subscribe(
    self,
    /,
    topic: SubscribeTopic,
    qos: int = 0,
    options: SubscribeOptions | None = None,
    properties: Properties | None = None,
    *args: Any,
    timeout: float | None = None,
    **kwargs: Any,
) -> tuple[int, ...] | list[ReasonCode]:
    """
    Subscribe to one or more MQTT topics.

    Args:
        topic: Topic(s) to subscribe to - can be:
            - str: Single topic or wildcard pattern
            - Topic: Single topic object
            - Wildcard: Single wildcard object  
            - list: Multiple topics with optional QoS specifications
        qos (int): Quality of service level, defaults to 0
        options (SubscribeOptions, optional): MQTT v5.0 subscription options
        properties (Properties, optional): MQTT v5.0 subscription properties
        timeout (float, optional): Operation timeout, uses client default if None

    Returns:
        tuple[int, ...] | list[ReasonCode]: Granted QoS levels or reason codes

    Raises:
        MqttError: If subscription operation fails
        MqttCodeError: If broker returns an error code
    """

async def unsubscribe(
    self,
    topic: str | Topic | Wildcard | list[str | Topic | Wildcard],
    properties: Properties | None = None,
    timeout: float | None = None,
) -> None:
    """
    Unsubscribe from one or more MQTT topics.

    Args:
        topic: Topic(s) to unsubscribe from
        properties (Properties, optional): MQTT v5.0 properties
        timeout (float, optional): Operation timeout, uses client default if None

    Raises:
        MqttError: If unsubscribe operation fails
    """

Usage examples:

import asyncio
from aiomqtt import Client

async def subscription_examples():
    async with Client("test.mosquitto.org") as client:
        # Simple topic subscription
        await client.subscribe("sensors/temperature")
        
        # Wildcard subscriptions
        await client.subscribe("sensors/+/temperature")  # Single level wildcard
        await client.subscribe("sensors/#")  # Multi-level wildcard
        
        # Multiple topics with different QoS
        await client.subscribe([
            ("sensors/temperature", 0),
            ("sensors/humidity", 1),
            ("alerts/#", 2)
        ])
        
        # QoS 1 subscription
        await client.subscribe("important/data", qos=1)
        
        # Unsubscribe
        await client.unsubscribe("sensors/temperature")
        await client.unsubscribe(["sensors/+/temp", "alerts/#"])

asyncio.run(subscription_examples())

Message Reception

Access received messages through the async iterator interface provided by the messages property.

@property
def messages(self) -> MessagesIterator:
    """
    Get async iterator for received messages.

    Returns:
        MessagesIterator: Iterator for received messages
    """

class MessagesIterator:
    def __aiter__(self) -> AsyncIterator[Message]:
        """Return async iterator."""
    
    def __anext__(self) -> Message:
        """Get next message from queue."""
    
    def __len__(self) -> int:
        """
        Get number of queued messages.

        Returns:
            int: Number of messages in queue
        """

Usage example:

import asyncio
from aiomqtt import Client

async def message_reception():
    async with Client("test.mosquitto.org") as client:
        await client.subscribe("sensors/#")
        
        # Receive messages indefinitely
        async for message in client.messages:
            print(f"Topic: {message.topic}")
            print(f"Payload: {message.payload}")
            print(f"QoS: {message.qos}")
            
            # Process specific topics
            if message.topic.matches("sensors/temperature"):
                temperature = float(message.payload)
                print(f"Temperature: {temperature}°C")
            
            # Check queue length
            print(f"Messages in queue: {len(client.messages)}")

asyncio.run(message_reception())

Client Properties

Access client configuration and status information.

@property
def identifier(self) -> str:
    """
    Get client identifier.

    Returns:
        str: MQTT client identifier
    """

@property
def pending_calls_threshold(self) -> int:
    """
    Get warning threshold for pending calls.

    Returns:
        int: Threshold value for pending calls warning
    """

@property
def timeout(self) -> float:
    """
    Get default timeout value.

    Returns:
        float: Default timeout in seconds
    """

Type Definitions

PayloadType = str | bytes | bytearray | int | float | None

SubscribeTopic = (
    str 
    | tuple[str, SubscribeOptions] 
    | list[tuple[str, SubscribeOptions]] 
    | list[tuple[str, int]]
)

WebSocketHeaders = dict[str, str] | Callable[[dict[str, str]], dict[str, str]]

PahoSocket = socket.socket | ssl.SSLSocket | Any

SocketOption = tuple[int, int, int | bytes] | tuple[int, int, None, int]

Install with Tessl CLI

npx tessl i tessl/pypi-aiomqtt

docs

configuration-security.md

core-client.md

error-handling.md

index.md

message-handling.md

topic-management.md

tile.json