FastAPI-MQTT provides MQTT integration for FastAPI applications with decorator-based event handlers
npx @tessl/cli install tessl/pypi-fastapi-mqtt@2.2.0FastAPI-MQTT provides MQTT integration for FastAPI applications with decorator-based event handlers. Built as a wrapper around the gmqtt module, it supports MQTT version 5.0 protocol and enables machine-to-machine communication in low bandwidth environments with decorator-based callback methods and publish/subscribe functionality.
pip install fastapi-mqttfrom fastapi_mqtt import FastMQTT, MQTTConfigFull import with MQTTClient:
from fastapi_mqtt import FastMQTT, MQTTConfig, MQTTClientFor handler type hints:
from gmqtt import Client as MQTTClientfrom contextlib import asynccontextmanager
from typing import Any
from fastapi import FastAPI
from gmqtt import Client as MQTTClient
from fastapi_mqtt import FastMQTT, MQTTConfig
# Configure MQTT connection
mqtt_config = MQTTConfig(
host="mqtt.broker.com",
port=1883,
keepalive=60,
username="user",
password="password"
)
# Create FastMQTT instance
fast_mqtt = FastMQTT(config=mqtt_config)
# FastAPI lifespan management
@asynccontextmanager
async def lifespan(app: FastAPI):
await fast_mqtt.mqtt_startup()
yield
await fast_mqtt.mqtt_shutdown()
app = FastAPI(lifespan=lifespan)
# Connection handler
@fast_mqtt.on_connect()
def connect(client: MQTTClient, flags: int, rc: int, properties: Any):
client.subscribe("/mqtt/data")
print("Connected to MQTT broker")
# Topic-specific subscription
@fast_mqtt.subscribe("sensors/+/temperature", qos=1)
async def temperature_handler(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any):
temp_data = payload.decode()
print(f"Temperature from {topic}: {temp_data}")
# Global message handler
@fast_mqtt.on_message()
async def message_handler(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any):
print(f"Message from {topic}: {payload.decode()}")
# API endpoint to publish messages
@app.get("/publish/{topic}/{message}")
async def publish_message(topic: str, message: str):
fast_mqtt.publish(topic, message)
return {"status": "published", "topic": topic, "message": message}FastAPI-MQTT follows a decorator-based pattern for event handling:
MQTT connection configuration using Pydantic BaseModel with support for authentication, SSL, reconnection, and last will messages.
class MQTTConfig:
"""
MQTT connection configuration.
Parameters:
- host: MQTT broker hostname (default: "localhost")
- port: MQTT broker port (default: 1883)
- ssl: SSL/TLS configuration (bool or SSLContext, default: False)
- keepalive: Keep-alive interval in seconds (default: 60)
- username: Authentication username (optional)
- password: Authentication password (optional)
- version: MQTT protocol version (default: MQTTv50, range: 4-5)
- reconnect_retries: Number of reconnection attempts (default: 1)
- reconnect_delay: Delay between reconnections in seconds (default: 6)
- will_message_topic: Last will message topic (optional)
- will_message_payload: Last will message payload (optional)
- will_delay_interval: Last will delay interval (optional)
"""
host: str = "localhost"
port: int = 1883
ssl: Union[bool, SSLContext] = False
keepalive: int = 60
username: Optional[str] = None
password: Optional[str] = None
version: int = Field(default=MQTTv50, ge=4, le=5)
reconnect_retries: Optional[int] = 1
reconnect_delay: Optional[int] = 6
will_message_topic: Optional[str] = None
will_message_payload: Optional[str] = None
will_delay_interval: Optional[int] = NoneMain FastMQTT client with customizable connection parameters and logging.
class FastMQTT:
def __init__(
self,
config: MQTTConfig,
*,
client_id: Optional[str] = None,
clean_session: bool = True,
optimistic_acknowledgement: bool = True,
mqtt_logger: Optional[logging.Logger] = None,
**kwargs: Any,
) -> None:
"""
Initialize FastMQTT client.
Parameters:
- config: MQTTConfig instance with connection parameters
- client_id: Unique client identifier (auto-generated if None)
- clean_session: Enable persistent session (default: True)
- optimistic_acknowledgement: MQTT acknowledgement behavior (default: True)
- mqtt_logger: Custom logger instance (optional)
- **kwargs: Additional gmqtt client parameters
"""Methods for establishing and managing MQTT broker connections with FastAPI lifecycle integration.
async def connection(self) -> None:
"""Establish connection to MQTT broker with authentication and configuration."""
async def mqtt_startup(self) -> None:
"""Initial connection method for FastAPI lifespan startup."""
async def mqtt_shutdown(self) -> None:
"""Final disconnection method for FastAPI lifespan shutdown."""
def init_app(self, fastapi_app) -> None:
"""Legacy method to add startup/shutdown event handlers (deprecated)."""Publish messages to MQTT topics with quality of service and retention options.
def publish(
self,
message_or_topic: str,
payload: Any = None,
qos: int = 0,
retain: bool = False,
**kwargs,
) -> None:
"""
Publish message to MQTT broker.
Parameters:
- message_or_topic: Topic name
- payload: Message payload (any serializable type)
- qos: Quality of Service level (0, 1, or 2)
- retain: Retain message on broker (default: False)
- **kwargs: Additional publish parameters
"""Subscribe to MQTT topics with pattern matching and quality of service configuration.
def subscribe(
self,
*topics,
qos: int = 0,
no_local: bool = False,
retain_as_published: bool = False,
retain_handling_options: int = 0,
subscription_identifier: Any = None,
) -> Callable[..., Any]:
"""
Decorator for subscribing to specific MQTT topics.
Parameters:
- *topics: Topic names (supports MQTT wildcards + and #)
- qos: Quality of Service level (0, 1, or 2)
- no_local: Don't receive own published messages (MQTT 5.0)
- retain_as_published: Retain flag handling (MQTT 5.0)
- retain_handling_options: Retained message behavior (MQTT 5.0)
- subscription_identifier: Subscription identifier (MQTT 5.0)
Returns:
Decorator function for message handler
"""
def unsubscribe(self, topic: str, **kwargs):
"""
Unsubscribe from MQTT topic.
Parameters:
- topic: Topic name to unsubscribe from
- **kwargs: Additional unsubscribe parameters
"""Decorator methods for handling MQTT connection lifecycle and message events.
def on_connect(self) -> Callable[..., Any]:
"""
Decorator for MQTT connection event handler.
Handler signature:
def handler(client: MQTTClient, flags: int, rc: int, properties: Any) -> Any
"""
def on_message(self) -> Callable[..., Any]:
"""
Decorator for global MQTT message handler (all topics).
Handler signature:
async def handler(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any) -> Any
"""
def on_disconnect(self) -> Callable[..., Any]:
"""
Decorator for MQTT disconnection event handler.
Handler signature:
def handler(client: MQTTClient, packet: bytes, exc: Optional[Exception]) -> Any
"""
def on_subscribe(self) -> Callable[..., Any]:
"""
Decorator for MQTT subscription acknowledgment handler.
Handler signature:
def handler(client: MQTTClient, mid: int, qos: int, properties: Any) -> Any
"""Static method for matching MQTT topics against wildcard patterns.
@staticmethod
def match(topic: str, template: str) -> bool:
"""
Match MQTT topic against template with wildcards.
Parameters:
- topic: Actual topic name
- template: Template with wildcards (+ for single level, # for multi-level)
Returns:
True if topic matches template pattern
Supports:
- Single-level wildcard (+): matches one topic level
- Multi-level wildcard (#): matches multiple topic levels
- Shared subscriptions ($share/group/topic)
"""from ssl import SSLContext
from typing import Any, Awaitable, Callable, Optional, Union
from gmqtt import Client as MQTTClient
from gmqtt.mqtt.constants import MQTTv50
from pydantic import Field
# Handler type definitions
MQTTMessageHandler = Callable[[MQTTClient, str, bytes, int, Any], Awaitable[Any]]
MQTTConnectionHandler = Callable[[MQTTClient, int, int, Any], Any]
MQTTSubscriptionHandler = Callable[[MQTTClient, int, int, Any], Any]
MQTTDisconnectHandler = Callable[[MQTTClient, bytes, Optional[Exception]], Any]
# Configuration types
Union[bool, SSLContext] # For ssl parameter
Field(default=MQTTv50, ge=4, le=5) # For version parameter with validationFastAPI-MQTT propagates exceptions from the underlying gmqtt client. Common exceptions include:
Handle these in your application code:
try:
await fast_mqtt.mqtt_startup()
except Exception as e:
print(f"MQTT connection failed: {e}")