The idiomatic asyncio MQTT client
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
TLS/SSL configuration, authentication, protocol versions, proxy settings, and connection parameters for secure and customized MQTT connections.
Choose the MQTT protocol version for compatibility with different brokers and feature requirements.
class ProtocolVersion(Enum):
V31 = mqtt.MQTTv31
V311 = mqtt.MQTTv311
V5 = mqtt.MQTTv5Protocol version differences:
Usage examples:
import asyncio
from aiomqtt import Client, ProtocolVersion
async def protocol_version_examples():
# Use MQTT v5.0 for latest features
async with Client(
"test.mosquitto.org",
protocol=ProtocolVersion.V5
) as client:
await client.publish("test/v5", "Using MQTT 5.0")
# Use MQTT v3.1.1 for compatibility
async with Client(
"broker.example.com",
protocol=ProtocolVersion.V311
) as client:
await client.publish("test/v311", "Using MQTT 3.1.1")
# Default protocol (let client decide)
async with Client("test.mosquitto.org") as client:
await client.publish("test/default", "Using default protocol")
asyncio.run(protocol_version_examples())Secure MQTT connections using TLS/SSL with certificate-based authentication and custom SSL contexts.
@dataclass
class TLSParameters:
ca_certs: str | None = None
certfile: str | None = None
keyfile: str | None = None
cert_reqs: ssl.VerifyMode | None = None
tls_version: Any | None = None
ciphers: str | None = None
keyfile_password: str | None = NoneTLS Configuration Parameters:
Usage examples:
import asyncio
import ssl
from aiomqtt import Client, TLSParameters
async def tls_configuration_examples():
# Basic TLS with CA certificate verification
tls_params = TLSParameters(
ca_certs="/path/to/ca-certificates.crt"
)
async with Client(
"secure-broker.example.com",
port=8883, # Standard MQTT TLS port
tls_params=tls_params
) as client:
await client.publish("secure/test", "TLS enabled")
# Mutual TLS authentication with client certificates
mutual_tls_params = TLSParameters(
ca_certs="/path/to/ca-cert.pem",
certfile="/path/to/client-cert.pem",
keyfile="/path/to/client-key.pem",
keyfile_password="key_password"
)
async with Client(
"mtls-broker.example.com",
port=8883,
tls_params=mutual_tls_params
) as client:
await client.publish("mtls/test", "Mutual TLS authenticated")
# Custom SSL context for advanced configuration
ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
async with Client(
"test-broker.example.com",
port=8883,
tls_context=ssl_context
) as client:
await client.publish("custom-ssl/test", "Custom SSL context")
# Disable hostname verification (not recommended for production)
async with Client(
"test-broker.example.com",
port=8883,
tls_params=TLSParameters(ca_certs="/path/to/ca.crt"),
tls_insecure=True
) as client:
await client.publish("insecure/test", "Hostname verification disabled")
asyncio.run(tls_configuration_examples())Configure username/password authentication and client identification.
Client authentication parameters:
# In Client.__init__():
username: str | None = None # Authentication username
password: str | None = None # Authentication password
identifier: str | None = None # Client ID (auto-generated if None)Usage examples:
import asyncio
from aiomqtt import Client
async def authentication_examples():
# Username/password authentication
async with Client(
"broker.example.com",
username="mqtt_user",
password="secure_password"
) as client:
await client.publish("auth/test", "Authenticated connection")
# Custom client identifier
async with Client(
"broker.example.com",
identifier="my-custom-client-id"
) as client:
print(f"Connected with ID: {client.identifier}")
await client.publish("id/test", "Custom client ID")
# Auto-generated client ID (default behavior)
async with Client("broker.example.com") as client:
print(f"Auto-generated ID: {client.identifier}")
await client.publish("auto-id/test", "Auto-generated client ID")
asyncio.run(authentication_examples())Connect through SOCKS or HTTP proxies for network environments requiring proxy access.
class ProxySettings:
def __init__(
self,
*,
proxy_type: int,
proxy_addr: str,
proxy_rdns: bool | None = True,
proxy_username: str | None = None,
proxy_password: str | None = None,
proxy_port: int | None = None,
):
"""
Configure proxy settings for MQTT connection.
Args:
proxy_type (int): Proxy type (SOCKS4, SOCKS5, HTTP)
proxy_addr (str): Proxy server address
proxy_rdns (bool, optional): Use remote DNS resolution
proxy_username (str, optional): Proxy authentication username
proxy_password (str, optional): Proxy authentication password
proxy_port (int, optional): Proxy server port
"""Usage example:
import asyncio
import socks
from aiomqtt import Client, ProxySettings
async def proxy_configuration():
# SOCKS5 proxy configuration
proxy_settings = ProxySettings(
proxy_type=socks.SOCKS5,
proxy_addr="proxy.example.com",
proxy_port=1080,
proxy_username="proxy_user",
proxy_password="proxy_pass"
)
async with Client(
"broker.example.com",
proxy=proxy_settings
) as client:
await client.publish("proxy/test", "Connected through SOCKS5 proxy")
# Note: Requires python-socks package
# pip install python-socks
asyncio.run(proxy_configuration())Configure last will messages to be published automatically when the client disconnects unexpectedly.
@dataclass
class Will:
topic: str
payload: str | bytes | bytearray | int | float | None = None
qos: int = 0
retain: bool = False
properties: Properties | None = NoneUsage examples:
import asyncio
from aiomqtt import Client, Will
async def will_message_examples():
# Basic will message
will = Will(
topic="status/device123",
payload="offline",
qos=1,
retain=True
)
async with Client(
"broker.example.com",
will=will
) as client:
# Publish online status
await client.publish("status/device123", "online", qos=1, retain=True)
# Do work - if client disconnects unexpectedly,
# broker will publish the will message
await client.publish("data/sensor", "42.5")
# JSON will message with device information
import json
device_info = {
"device_id": "sensor123",
"status": "offline",
"timestamp": "2023-01-01T00:00:00Z",
"reason": "unexpected_disconnect"
}
will_with_json = Will(
topic="devices/sensor123/status",
payload=json.dumps(device_info),
qos=2,
retain=True
)
async with Client(
"broker.example.com",
will=will_with_json
) as client:
await client.publish("devices/sensor123/data", "25.4")
asyncio.run(will_message_examples())Configure connection behavior, timeouts, and session management.
Key connection parameters:
# In Client.__init__():
keepalive: int = 60 # Keep-alive interval in seconds
timeout: float | None = None # Default operation timeout
clean_session: bool | None = None # Clean session flag (MQTT v3.1.1)
clean_start: mqtt.CleanStartOption # Clean start option (MQTT v5.0)
bind_address: str = "" # Local interface to bind
bind_port: int = 0 # Local port to bindUsage examples:
import asyncio
from aiomqtt import Client
import paho.mqtt.client as mqtt
async def connection_parameter_examples():
# Custom keepalive and timeout
async with Client(
"broker.example.com",
keepalive=30, # Send keepalive every 30 seconds
timeout=10.0 # 10 second default timeout
) as client:
await client.publish("config/test", "Custom keepalive/timeout")
# Clean session configuration (MQTT v3.1.1)
async with Client(
"broker.example.com",
clean_session=False # Persistent session
) as client:
await client.subscribe("persistent/data", qos=1)
# Session will be preserved after disconnect
# Clean start configuration (MQTT v5.0)
async with Client(
"broker.example.com",
clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY
) as client:
await client.publish("v5/test", "Clean start configuration")
# Bind to specific local interface
async with Client(
"broker.example.com",
bind_address="192.168.1.100", # Bind to specific interface
bind_port=0 # Let system choose port
) as client:
await client.publish("bind/test", "Bound to specific interface")
asyncio.run(connection_parameter_examples())Configure WebSocket transport for browser compatibility or firewall traversal.
WebSocket parameters:
# In Client.__init__():
transport: Literal["tcp", "websockets", "unix"] = "tcp"
websocket_path: str | None = None # WebSocket path
websocket_headers: WebSocketHeaders | None = None # Custom headersUsage examples:
import asyncio
from aiomqtt import Client
async def websocket_configuration():
# Basic WebSocket connection
async with Client(
"broker.example.com",
port=9001, # WebSocket port
transport="websockets"
) as client:
await client.publish("websocket/test", "WebSocket transport")
# WebSocket with custom path and headers
custom_headers = {
"Authorization": "Bearer token123",
"X-Client-Type": "aiomqtt"
}
async with Client(
"broker.example.com",
port=9001,
transport="websockets",
websocket_path="/mqtt/websocket",
websocket_headers=custom_headers
) as client:
await client.publish("websocket/custom", "Custom WebSocket config")
asyncio.run(websocket_configuration())Configure message queue limits, concurrency, and socket options for performance tuning.
Advanced parameters:
# In Client.__init__():
max_queued_incoming_messages: int | None = None # Incoming queue limit
max_queued_outgoing_messages: int | None = None # Outgoing queue limit
max_inflight_messages: int | None = None # Max inflight messages
max_concurrent_outgoing_calls: int | None = None # Concurrency limit
socket_options: Iterable[SocketOption] | None = None # Socket optionsUsage example:
import asyncio
import socket
from aiomqtt import Client
async def advanced_configuration():
# Configure message queue limits and concurrency
async with Client(
"broker.example.com",
max_queued_incoming_messages=1000, # Limit incoming queue
max_queued_outgoing_messages=500, # Limit outgoing queue
max_inflight_messages=20, # Max unacknowledged messages
max_concurrent_outgoing_calls=10 # Max concurrent operations
) as client:
await client.publish("advanced/test", "Advanced configuration")
# Custom socket options
socket_opts = [
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), # Enable keepalive
(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # Disable Nagle algorithm
]
async with Client(
"broker.example.com",
socket_options=socket_opts
) as client:
await client.publish("socket/test", "Custom socket options")
asyncio.run(advanced_configuration())WebSocketHeaders = dict[str, str] | Callable[[dict[str, str]], dict[str, str]]
SocketOption = tuple[int, int, int | bytes] | tuple[int, int, None, int]Install with Tessl CLI
npx tessl i tessl/pypi-aiomqtt