The idiomatic asyncio MQTT client
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Essential MQTT client functionality including connection management, publishing, and subscribing. The Client class serves as the main entry point for all MQTT operations using async context manager patterns.
The Client class provides async context manager support for automatic connection and disconnection management, eliminating the need for manual connection lifecycle handling.
class Client:
def __init__(
self,
hostname: str,
port: int = 1883,
*,
username: str | None = None,
password: str | None = None,
logger: logging.Logger | None = None,
identifier: str | None = None,
queue_type: type[asyncio.Queue[Message]] | None = None,
protocol: ProtocolVersion | None = None,
will: Will | None = None,
clean_session: bool | None = None,
transport: Literal["tcp", "websockets", "unix"] = "tcp",
timeout: float | None = None,
keepalive: int = 60,
bind_address: str = "",
bind_port: int = 0,
clean_start: mqtt.CleanStartOption = mqtt.MQTT_CLEAN_START_FIRST_ONLY,
max_queued_incoming_messages: int | None = None,
max_queued_outgoing_messages: int | None = None,
max_inflight_messages: int | None = None,
max_concurrent_outgoing_calls: int | None = None,
properties: Properties | None = None,
tls_context: ssl.SSLContext | None = None,
tls_params: TLSParameters | None = None,
tls_insecure: bool | None = None,
proxy: ProxySettings | None = None,
socket_options: Iterable[SocketOption] | None = None,
websocket_path: str | None = None,
websocket_headers: WebSocketHeaders | None = None,
):
"""
Initialize MQTT client with connection parameters.
Args:
hostname (str): MQTT broker hostname or IP address
port (int): MQTT broker port, defaults to 1883
username (str, optional): Authentication username
password (str, optional): Authentication password
logger (logging.Logger, optional): Custom logger instance
identifier (str, optional): Client ID, auto-generated if None
queue_type (type, optional): Custom message queue class
protocol (ProtocolVersion, optional): MQTT protocol version
will (Will, optional): Last will and testament message
clean_session (bool, optional): Clean session flag for MQTT v3.1.1
transport (str): Transport protocol - "tcp", "websockets", or "unix"
timeout (float, optional): Default timeout for operations
keepalive (int): Keep-alive interval in seconds
bind_address (str): Local interface to bind to
bind_port (int): Local port to bind to
clean_start (mqtt.CleanStartOption): MQTT v5.0 clean start option
max_queued_incoming_messages (int, optional): Incoming message queue limit
max_queued_outgoing_messages (int, optional): Outgoing message queue limit
max_inflight_messages (int, optional): Maximum inflight messages
max_concurrent_outgoing_calls (int, optional): Concurrency limit
properties (Properties, optional): MQTT v5.0 connection properties
tls_context (ssl.SSLContext, optional): Pre-configured SSL context
tls_params (TLSParameters, optional): SSL/TLS configuration parameters
tls_insecure (bool, optional): Disable hostname verification
proxy (ProxySettings, optional): Proxy configuration
socket_options (Iterable, optional): Socket options
websocket_path (str, optional): WebSocket path for websocket transport
websocket_headers (WebSocketHeaders, optional): WebSocket headers
"""
async def __aenter__(self) -> Self:
"""
Connect to MQTT broker when entering async context.
Returns:
Self: The connected client instance
Raises:
MqttError: If connection fails
"""
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: TracebackType | None,
) -> None:
"""
Disconnect from MQTT broker when exiting async context.
Args:
exc_type: Exception type if context exited with exception
exc: Exception instance if context exited with exception
tb: Traceback if context exited with exception
"""Usage example:
import asyncio
from aiomqtt import Client
async def basic_connection():
# Automatic connection and disconnection
async with Client("test.mosquitto.org") as client:
print(f"Connected with client ID: {client.identifier}")
# Client automatically disconnects when exiting context
asyncio.run(basic_connection())Publish messages to MQTT topics with support for quality of service, retained messages, and MQTT v5.0 properties.
async def publish(
self,
/,
topic: str,
payload: PayloadType = None,
qos: int = 0,
retain: bool = False,
properties: Properties | None = None,
*args: Any,
timeout: float | None = None,
**kwargs: Any,
) -> None:
"""
Publish a message to an MQTT topic.
Args:
topic (str): Target topic for the message
payload (PayloadType, optional): Message payload, defaults to None
qos (int): Quality of service level (0, 1, or 2), defaults to 0
retain (bool): Whether to retain the message on the broker, defaults to False
properties (Properties, optional): MQTT v5.0 message properties
timeout (float, optional): Operation timeout, uses client default if None
Raises:
MqttError: If publish operation fails
MqttCodeError: If broker returns an error code
"""Usage examples:
import asyncio
from aiomqtt import Client
async def publish_examples():
async with Client("test.mosquitto.org") as client:
# Simple text message
await client.publish("sensors/temperature", "23.5")
# Binary payload
await client.publish("sensors/image", b"binary_image_data")
# Numeric payload
await client.publish("sensors/humidity", 65.2)
# QoS 1 with retain flag
await client.publish(
"status/online",
"connected",
qos=1,
retain=True
)
# With custom timeout
await client.publish(
"slow/topic",
"data",
timeout=10.0
)
asyncio.run(publish_examples())Subscribe to MQTT topics and wildcards with support for multiple QoS levels and subscription options.
async def subscribe(
self,
/,
topic: SubscribeTopic,
qos: int = 0,
options: SubscribeOptions | None = None,
properties: Properties | None = None,
*args: Any,
timeout: float | None = None,
**kwargs: Any,
) -> tuple[int, ...] | list[ReasonCode]:
"""
Subscribe to one or more MQTT topics.
Args:
topic: Topic(s) to subscribe to - can be:
- str: Single topic or wildcard pattern
- Topic: Single topic object
- Wildcard: Single wildcard object
- list: Multiple topics with optional QoS specifications
qos (int): Quality of service level, defaults to 0
options (SubscribeOptions, optional): MQTT v5.0 subscription options
properties (Properties, optional): MQTT v5.0 subscription properties
timeout (float, optional): Operation timeout, uses client default if None
Returns:
tuple[int, ...] | list[ReasonCode]: Granted QoS levels or reason codes
Raises:
MqttError: If subscription operation fails
MqttCodeError: If broker returns an error code
"""
async def unsubscribe(
self,
topic: str | Topic | Wildcard | list[str | Topic | Wildcard],
properties: Properties | None = None,
timeout: float | None = None,
) -> None:
"""
Unsubscribe from one or more MQTT topics.
Args:
topic: Topic(s) to unsubscribe from
properties (Properties, optional): MQTT v5.0 properties
timeout (float, optional): Operation timeout, uses client default if None
Raises:
MqttError: If unsubscribe operation fails
"""Usage examples:
import asyncio
from aiomqtt import Client
async def subscription_examples():
async with Client("test.mosquitto.org") as client:
# Simple topic subscription
await client.subscribe("sensors/temperature")
# Wildcard subscriptions
await client.subscribe("sensors/+/temperature") # Single level wildcard
await client.subscribe("sensors/#") # Multi-level wildcard
# Multiple topics with different QoS
await client.subscribe([
("sensors/temperature", 0),
("sensors/humidity", 1),
("alerts/#", 2)
])
# QoS 1 subscription
await client.subscribe("important/data", qos=1)
# Unsubscribe
await client.unsubscribe("sensors/temperature")
await client.unsubscribe(["sensors/+/temp", "alerts/#"])
asyncio.run(subscription_examples())Access received messages through the async iterator interface provided by the messages property.
@property
def messages(self) -> MessagesIterator:
"""
Get async iterator for received messages.
Returns:
MessagesIterator: Iterator for received messages
"""
class MessagesIterator:
def __aiter__(self) -> AsyncIterator[Message]:
"""Return async iterator."""
def __anext__(self) -> Message:
"""Get next message from queue."""
def __len__(self) -> int:
"""
Get number of queued messages.
Returns:
int: Number of messages in queue
"""Usage example:
import asyncio
from aiomqtt import Client
async def message_reception():
async with Client("test.mosquitto.org") as client:
await client.subscribe("sensors/#")
# Receive messages indefinitely
async for message in client.messages:
print(f"Topic: {message.topic}")
print(f"Payload: {message.payload}")
print(f"QoS: {message.qos}")
# Process specific topics
if message.topic.matches("sensors/temperature"):
temperature = float(message.payload)
print(f"Temperature: {temperature}°C")
# Check queue length
print(f"Messages in queue: {len(client.messages)}")
asyncio.run(message_reception())Access client configuration and status information.
@property
def identifier(self) -> str:
"""
Get client identifier.
Returns:
str: MQTT client identifier
"""
@property
def pending_calls_threshold(self) -> int:
"""
Get warning threshold for pending calls.
Returns:
int: Threshold value for pending calls warning
"""
@property
def timeout(self) -> float:
"""
Get default timeout value.
Returns:
float: Default timeout in seconds
"""PayloadType = str | bytes | bytearray | int | float | None
SubscribeTopic = (
str
| tuple[str, SubscribeOptions]
| list[tuple[str, SubscribeOptions]]
| list[tuple[str, int]]
)
WebSocketHeaders = dict[str, str] | Callable[[dict[str, str]], dict[str, str]]
PahoSocket = socket.socket | ssl.SSLSocket | Any
SocketOption = tuple[int, int, int | bytes] | tuple[int, int, None, int]Install with Tessl CLI
npx tessl i tessl/pypi-aiomqtt