CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aiomqtt

The idiomatic asyncio MQTT client

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

configuration-security.mddocs/

Configuration & Security

TLS/SSL configuration, authentication, protocol versions, proxy settings, and connection parameters for secure and customized MQTT connections.

Capabilities

Protocol Version Selection

Choose the MQTT protocol version for compatibility with different brokers and feature requirements.

class ProtocolVersion(Enum):
    V31 = mqtt.MQTTv31
    V311 = mqtt.MQTTv311  
    V5 = mqtt.MQTTv5

Protocol version differences:

  • V31 (MQTT 3.1): Original protocol, basic features
  • V311 (MQTT 3.1.1): Improved error handling, widely supported
  • V5 (MQTT 5.0): Latest version with enhanced features, properties, reason codes

Usage examples:

import asyncio
from aiomqtt import Client, ProtocolVersion

async def protocol_version_examples():
    # Use MQTT v5.0 for latest features
    async with Client(
        "test.mosquitto.org", 
        protocol=ProtocolVersion.V5
    ) as client:
        await client.publish("test/v5", "Using MQTT 5.0")
    
    # Use MQTT v3.1.1 for compatibility
    async with Client(
        "broker.example.com",
        protocol=ProtocolVersion.V311
    ) as client:
        await client.publish("test/v311", "Using MQTT 3.1.1")
    
    # Default protocol (let client decide)
    async with Client("test.mosquitto.org") as client:
        await client.publish("test/default", "Using default protocol")

asyncio.run(protocol_version_examples())

TLS/SSL Configuration

Secure MQTT connections using TLS/SSL with certificate-based authentication and custom SSL contexts.

@dataclass
class TLSParameters:
    ca_certs: str | None = None
    certfile: str | None = None
    keyfile: str | None = None
    cert_reqs: ssl.VerifyMode | None = None
    tls_version: Any | None = None
    ciphers: str | None = None
    keyfile_password: str | None = None

TLS Configuration Parameters:

  • ca_certs: Path to Certificate Authority certificates file
  • certfile: Path to client certificate file
  • keyfile: Path to client private key file
  • cert_reqs: Certificate verification requirements
  • tls_version: Specific TLS version to use
  • ciphers: Allowed cipher suites
  • keyfile_password: Password for encrypted private key

Usage examples:

import asyncio
import ssl
from aiomqtt import Client, TLSParameters

async def tls_configuration_examples():
    # Basic TLS with CA certificate verification
    tls_params = TLSParameters(
        ca_certs="/path/to/ca-certificates.crt"
    )
    
    async with Client(
        "secure-broker.example.com",
        port=8883,  # Standard MQTT TLS port
        tls_params=tls_params
    ) as client:
        await client.publish("secure/test", "TLS enabled")
    
    # Mutual TLS authentication with client certificates
    mutual_tls_params = TLSParameters(
        ca_certs="/path/to/ca-cert.pem",
        certfile="/path/to/client-cert.pem", 
        keyfile="/path/to/client-key.pem",
        keyfile_password="key_password"
    )
    
    async with Client(
        "mtls-broker.example.com",
        port=8883,
        tls_params=mutual_tls_params
    ) as client:
        await client.publish("mtls/test", "Mutual TLS authenticated")
    
    # Custom SSL context for advanced configuration
    ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE
    
    async with Client(
        "test-broker.example.com",
        port=8883,
        tls_context=ssl_context
    ) as client:
        await client.publish("custom-ssl/test", "Custom SSL context")
    
    # Disable hostname verification (not recommended for production)
    async with Client(
        "test-broker.example.com",
        port=8883,
        tls_params=TLSParameters(ca_certs="/path/to/ca.crt"),
        tls_insecure=True
    ) as client:
        await client.publish("insecure/test", "Hostname verification disabled")

asyncio.run(tls_configuration_examples())

Authentication Configuration

Configure username/password authentication and client identification.

Client authentication parameters:

# In Client.__init__():
username: str | None = None      # Authentication username
password: str | None = None      # Authentication password  
identifier: str | None = None    # Client ID (auto-generated if None)

Usage examples:

import asyncio
from aiomqtt import Client

async def authentication_examples():
    # Username/password authentication
    async with Client(
        "broker.example.com",
        username="mqtt_user",
        password="secure_password"
    ) as client:
        await client.publish("auth/test", "Authenticated connection")
    
    # Custom client identifier
    async with Client(
        "broker.example.com", 
        identifier="my-custom-client-id"
    ) as client:
        print(f"Connected with ID: {client.identifier}")
        await client.publish("id/test", "Custom client ID")
    
    # Auto-generated client ID (default behavior)
    async with Client("broker.example.com") as client:
        print(f"Auto-generated ID: {client.identifier}")
        await client.publish("auto-id/test", "Auto-generated client ID")

asyncio.run(authentication_examples())

Proxy Configuration

Connect through SOCKS or HTTP proxies for network environments requiring proxy access.

class ProxySettings:
    def __init__(
        self,
        *,
        proxy_type: int,
        proxy_addr: str,
        proxy_rdns: bool | None = True,
        proxy_username: str | None = None,
        proxy_password: str | None = None,
        proxy_port: int | None = None,
    ):
        """
        Configure proxy settings for MQTT connection.

        Args:
            proxy_type (int): Proxy type (SOCKS4, SOCKS5, HTTP)
            proxy_addr (str): Proxy server address
            proxy_rdns (bool, optional): Use remote DNS resolution
            proxy_username (str, optional): Proxy authentication username
            proxy_password (str, optional): Proxy authentication password
            proxy_port (int, optional): Proxy server port
        """

Usage example:

import asyncio
import socks
from aiomqtt import Client, ProxySettings

async def proxy_configuration():
    # SOCKS5 proxy configuration
    proxy_settings = ProxySettings(
        proxy_type=socks.SOCKS5,
        proxy_addr="proxy.example.com",
        proxy_port=1080,
        proxy_username="proxy_user",
        proxy_password="proxy_pass"
    )
    
    async with Client(
        "broker.example.com",
        proxy=proxy_settings
    ) as client:
        await client.publish("proxy/test", "Connected through SOCKS5 proxy")

# Note: Requires python-socks package
# pip install python-socks
asyncio.run(proxy_configuration())

Last Will and Testament

Configure last will messages to be published automatically when the client disconnects unexpectedly.

@dataclass
class Will:
    topic: str
    payload: str | bytes | bytearray | int | float | None = None
    qos: int = 0
    retain: bool = False
    properties: Properties | None = None

Usage examples:

import asyncio
from aiomqtt import Client, Will

async def will_message_examples():
    # Basic will message
    will = Will(
        topic="status/device123",
        payload="offline",
        qos=1,
        retain=True
    )
    
    async with Client(
        "broker.example.com",
        will=will
    ) as client:
        # Publish online status
        await client.publish("status/device123", "online", qos=1, retain=True)
        
        # Do work - if client disconnects unexpectedly,
        # broker will publish the will message
        await client.publish("data/sensor", "42.5")
    
    # JSON will message with device information
    import json
    device_info = {
        "device_id": "sensor123",
        "status": "offline",
        "timestamp": "2023-01-01T00:00:00Z",
        "reason": "unexpected_disconnect"
    }
    
    will_with_json = Will(
        topic="devices/sensor123/status",
        payload=json.dumps(device_info),
        qos=2,
        retain=True
    )
    
    async with Client(
        "broker.example.com",
        will=will_with_json
    ) as client:
        await client.publish("devices/sensor123/data", "25.4")

asyncio.run(will_message_examples())

Connection Parameters

Configure connection behavior, timeouts, and session management.

Key connection parameters:

# In Client.__init__():
keepalive: int = 60                    # Keep-alive interval in seconds
timeout: float | None = None           # Default operation timeout
clean_session: bool | None = None      # Clean session flag (MQTT v3.1.1)
clean_start: mqtt.CleanStartOption     # Clean start option (MQTT v5.0)
bind_address: str = ""                 # Local interface to bind
bind_port: int = 0                     # Local port to bind

Usage examples:

import asyncio
from aiomqtt import Client
import paho.mqtt.client as mqtt

async def connection_parameter_examples():
    # Custom keepalive and timeout
    async with Client(
        "broker.example.com",
        keepalive=30,      # Send keepalive every 30 seconds
        timeout=10.0       # 10 second default timeout
    ) as client:
        await client.publish("config/test", "Custom keepalive/timeout")
    
    # Clean session configuration (MQTT v3.1.1)
    async with Client(
        "broker.example.com",
        clean_session=False  # Persistent session
    ) as client:
        await client.subscribe("persistent/data", qos=1)
        # Session will be preserved after disconnect
    
    # Clean start configuration (MQTT v5.0)
    async with Client(
        "broker.example.com",
        clean_start=mqtt.MQTT_CLEAN_START_FIRST_ONLY
    ) as client:
        await client.publish("v5/test", "Clean start configuration")
    
    # Bind to specific local interface
    async with Client(
        "broker.example.com",
        bind_address="192.168.1.100",  # Bind to specific interface
        bind_port=0                     # Let system choose port
    ) as client:
        await client.publish("bind/test", "Bound to specific interface")

asyncio.run(connection_parameter_examples())

WebSocket Configuration

Configure WebSocket transport for browser compatibility or firewall traversal.

WebSocket parameters:

# In Client.__init__():
transport: Literal["tcp", "websockets", "unix"] = "tcp"
websocket_path: str | None = None       # WebSocket path
websocket_headers: WebSocketHeaders | None = None  # Custom headers

Usage examples:

import asyncio
from aiomqtt import Client

async def websocket_configuration():
    # Basic WebSocket connection
    async with Client(
        "broker.example.com",
        port=9001,  # WebSocket port
        transport="websockets"
    ) as client:
        await client.publish("websocket/test", "WebSocket transport")
    
    # WebSocket with custom path and headers
    custom_headers = {
        "Authorization": "Bearer token123",
        "X-Client-Type": "aiomqtt"
    }
    
    async with Client(
        "broker.example.com",
        port=9001,
        transport="websockets",
        websocket_path="/mqtt/websocket",
        websocket_headers=custom_headers
    ) as client:
        await client.publish("websocket/custom", "Custom WebSocket config")

asyncio.run(websocket_configuration())

Advanced Configuration

Configure message queue limits, concurrency, and socket options for performance tuning.

Advanced parameters:

# In Client.__init__():
max_queued_incoming_messages: int | None = None    # Incoming queue limit
max_queued_outgoing_messages: int | None = None    # Outgoing queue limit  
max_inflight_messages: int | None = None           # Max inflight messages
max_concurrent_outgoing_calls: int | None = None   # Concurrency limit
socket_options: Iterable[SocketOption] | None = None  # Socket options

Usage example:

import asyncio
import socket
from aiomqtt import Client

async def advanced_configuration():
    # Configure message queue limits and concurrency
    async with Client(
        "broker.example.com",
        max_queued_incoming_messages=1000,    # Limit incoming queue
        max_queued_outgoing_messages=500,     # Limit outgoing queue
        max_inflight_messages=20,             # Max unacknowledged messages
        max_concurrent_outgoing_calls=10      # Max concurrent operations
    ) as client:
        await client.publish("advanced/test", "Advanced configuration")
    
    # Custom socket options
    socket_opts = [
        (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),    # Enable keepalive
        (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)     # Disable Nagle algorithm
    ]
    
    async with Client(
        "broker.example.com",
        socket_options=socket_opts
    ) as client:
        await client.publish("socket/test", "Custom socket options")

asyncio.run(advanced_configuration())

Type Definitions

WebSocketHeaders = dict[str, str] | Callable[[dict[str, str]], dict[str, str]]

SocketOption = tuple[int, int, int | bytes] | tuple[int, int, None, int]

Install with Tessl CLI

npx tessl i tessl/pypi-aiomqtt

docs

configuration-security.md

core-client.md

error-handling.md

index.md

message-handling.md

topic-management.md

tile.json