or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-fastapi-mqtt

FastAPI-MQTT provides MQTT integration for FastAPI applications with decorator-based event handlers

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/fastapi-mqtt@2.2.x

To install, run

npx @tessl/cli install tessl/pypi-fastapi-mqtt@2.2.0

index.mddocs/

FastAPI MQTT

FastAPI-MQTT provides MQTT integration for FastAPI applications with decorator-based event handlers. Built as a wrapper around the gmqtt module, it supports MQTT version 5.0 protocol and enables machine-to-machine communication in low bandwidth environments with decorator-based callback methods and publish/subscribe functionality.

Package Information

  • Package Name: fastapi-mqtt
  • Language: Python
  • Installation: pip install fastapi-mqtt

Core Imports

from fastapi_mqtt import FastMQTT, MQTTConfig

Full import with MQTTClient:

from fastapi_mqtt import FastMQTT, MQTTConfig, MQTTClient

For handler type hints:

from gmqtt import Client as MQTTClient

Basic Usage

from contextlib import asynccontextmanager
from typing import Any
from fastapi import FastAPI
from gmqtt import Client as MQTTClient
from fastapi_mqtt import FastMQTT, MQTTConfig

# Configure MQTT connection
mqtt_config = MQTTConfig(
    host="mqtt.broker.com",
    port=1883,
    keepalive=60,
    username="user",
    password="password"
)

# Create FastMQTT instance
fast_mqtt = FastMQTT(config=mqtt_config)

# FastAPI lifespan management
@asynccontextmanager
async def lifespan(app: FastAPI):
    await fast_mqtt.mqtt_startup()
    yield
    await fast_mqtt.mqtt_shutdown()

app = FastAPI(lifespan=lifespan)

# Connection handler
@fast_mqtt.on_connect()
def connect(client: MQTTClient, flags: int, rc: int, properties: Any):
    client.subscribe("/mqtt/data")
    print("Connected to MQTT broker")

# Topic-specific subscription
@fast_mqtt.subscribe("sensors/+/temperature", qos=1)
async def temperature_handler(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any):
    temp_data = payload.decode()
    print(f"Temperature from {topic}: {temp_data}")

# Global message handler
@fast_mqtt.on_message()
async def message_handler(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any):
    print(f"Message from {topic}: {payload.decode()}")

# API endpoint to publish messages
@app.get("/publish/{topic}/{message}")
async def publish_message(topic: str, message: str):
    fast_mqtt.publish(topic, message)
    return {"status": "published", "topic": topic, "message": message}

Architecture

FastAPI-MQTT follows a decorator-based pattern for event handling:

  • FastMQTT: Main client class managing MQTT connections and subscriptions
  • MQTTConfig: Pydantic-based configuration for connection parameters
  • Decorators: Event handlers for connection, subscription, and message events
  • Handler Management: Internal system for organizing user-defined callbacks
  • Topic Matching: Built-in support for MQTT wildcards (+ and #) and shared subscriptions

Capabilities

Configuration

MQTT connection configuration using Pydantic BaseModel with support for authentication, SSL, reconnection, and last will messages.

class MQTTConfig:
    """
    MQTT connection configuration.
    
    Parameters:
    - host: MQTT broker hostname (default: "localhost")
    - port: MQTT broker port (default: 1883)
    - ssl: SSL/TLS configuration (bool or SSLContext, default: False)
    - keepalive: Keep-alive interval in seconds (default: 60)
    - username: Authentication username (optional)
    - password: Authentication password (optional)
    - version: MQTT protocol version (default: MQTTv50, range: 4-5)
    - reconnect_retries: Number of reconnection attempts (default: 1)
    - reconnect_delay: Delay between reconnections in seconds (default: 6)
    - will_message_topic: Last will message topic (optional)
    - will_message_payload: Last will message payload (optional)
    - will_delay_interval: Last will delay interval (optional)
    """
    host: str = "localhost"
    port: int = 1883
    ssl: Union[bool, SSLContext] = False
    keepalive: int = 60
    username: Optional[str] = None
    password: Optional[str] = None
    version: int = Field(default=MQTTv50, ge=4, le=5)
    reconnect_retries: Optional[int] = 1
    reconnect_delay: Optional[int] = 6
    will_message_topic: Optional[str] = None
    will_message_payload: Optional[str] = None
    will_delay_interval: Optional[int] = None

Client Initialization

Main FastMQTT client with customizable connection parameters and logging.

class FastMQTT:
    def __init__(
        self,
        config: MQTTConfig,
        *,
        client_id: Optional[str] = None,
        clean_session: bool = True,
        optimistic_acknowledgement: bool = True,
        mqtt_logger: Optional[logging.Logger] = None,
        **kwargs: Any,
    ) -> None:
        """
        Initialize FastMQTT client.
        
        Parameters:
        - config: MQTTConfig instance with connection parameters
        - client_id: Unique client identifier (auto-generated if None)
        - clean_session: Enable persistent session (default: True)
        - optimistic_acknowledgement: MQTT acknowledgement behavior (default: True)
        - mqtt_logger: Custom logger instance (optional)
        - **kwargs: Additional gmqtt client parameters
        """

Connection Management

Methods for establishing and managing MQTT broker connections with FastAPI lifecycle integration.

async def connection(self) -> None:
    """Establish connection to MQTT broker with authentication and configuration."""

async def mqtt_startup(self) -> None:
    """Initial connection method for FastAPI lifespan startup."""

async def mqtt_shutdown(self) -> None:
    """Final disconnection method for FastAPI lifespan shutdown."""

def init_app(self, fastapi_app) -> None:
    """Legacy method to add startup/shutdown event handlers (deprecated)."""

Message Publishing

Publish messages to MQTT topics with quality of service and retention options.

def publish(
    self,
    message_or_topic: str,
    payload: Any = None,
    qos: int = 0,
    retain: bool = False,
    **kwargs,
) -> None:
    """
    Publish message to MQTT broker.
    
    Parameters:
    - message_or_topic: Topic name
    - payload: Message payload (any serializable type)
    - qos: Quality of Service level (0, 1, or 2)
    - retain: Retain message on broker (default: False)
    - **kwargs: Additional publish parameters
    """

Topic Subscription

Subscribe to MQTT topics with pattern matching and quality of service configuration.

def subscribe(
    self,
    *topics,
    qos: int = 0,
    no_local: bool = False,
    retain_as_published: bool = False,
    retain_handling_options: int = 0,
    subscription_identifier: Any = None,
) -> Callable[..., Any]:
    """
    Decorator for subscribing to specific MQTT topics.
    
    Parameters:
    - *topics: Topic names (supports MQTT wildcards + and #)
    - qos: Quality of Service level (0, 1, or 2)
    - no_local: Don't receive own published messages (MQTT 5.0)
    - retain_as_published: Retain flag handling (MQTT 5.0)
    - retain_handling_options: Retained message behavior (MQTT 5.0)
    - subscription_identifier: Subscription identifier (MQTT 5.0)
    
    Returns:
    Decorator function for message handler
    """

def unsubscribe(self, topic: str, **kwargs):
    """
    Unsubscribe from MQTT topic.
    
    Parameters:
    - topic: Topic name to unsubscribe from
    - **kwargs: Additional unsubscribe parameters
    """

Event Handlers

Decorator methods for handling MQTT connection lifecycle and message events.

def on_connect(self) -> Callable[..., Any]:
    """
    Decorator for MQTT connection event handler.
    
    Handler signature:
    def handler(client: MQTTClient, flags: int, rc: int, properties: Any) -> Any
    """

def on_message(self) -> Callable[..., Any]:
    """
    Decorator for global MQTT message handler (all topics).
    
    Handler signature:
    async def handler(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any) -> Any
    """

def on_disconnect(self) -> Callable[..., Any]:
    """
    Decorator for MQTT disconnection event handler.
    
    Handler signature:
    def handler(client: MQTTClient, packet: bytes, exc: Optional[Exception]) -> Any
    """

def on_subscribe(self) -> Callable[..., Any]:
    """
    Decorator for MQTT subscription acknowledgment handler.
    
    Handler signature:
    def handler(client: MQTTClient, mid: int, qos: int, properties: Any) -> Any
    """

Topic Pattern Matching

Static method for matching MQTT topics against wildcard patterns.

@staticmethod
def match(topic: str, template: str) -> bool:
    """
    Match MQTT topic against template with wildcards.
    
    Parameters:
    - topic: Actual topic name
    - template: Template with wildcards (+ for single level, # for multi-level)
    
    Returns:
    True if topic matches template pattern
    
    Supports:
    - Single-level wildcard (+): matches one topic level
    - Multi-level wildcard (#): matches multiple topic levels
    - Shared subscriptions ($share/group/topic)
    """

Types

from ssl import SSLContext
from typing import Any, Awaitable, Callable, Optional, Union
from gmqtt import Client as MQTTClient
from gmqtt.mqtt.constants import MQTTv50
from pydantic import Field

# Handler type definitions
MQTTMessageHandler = Callable[[MQTTClient, str, bytes, int, Any], Awaitable[Any]]
MQTTConnectionHandler = Callable[[MQTTClient, int, int, Any], Any]
MQTTSubscriptionHandler = Callable[[MQTTClient, int, int, Any], Any]
MQTTDisconnectHandler = Callable[[MQTTClient, bytes, Optional[Exception]], Any]

# Configuration types
Union[bool, SSLContext]  # For ssl parameter
Field(default=MQTTv50, ge=4, le=5)  # For version parameter with validation

Error Handling

FastAPI-MQTT propagates exceptions from the underlying gmqtt client. Common exceptions include:

  • Connection errors: Network connectivity issues, invalid broker address
  • Authentication failures: Invalid username/password credentials
  • Protocol errors: MQTT protocol version mismatches, malformed messages
  • Timeout errors: Connection timeout, keep-alive timeout

Handle these in your application code:

try:
    await fast_mqtt.mqtt_startup()
except Exception as e:
    print(f"MQTT connection failed: {e}")