CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fastapi-mqtt

FastAPI-MQTT provides MQTT integration for FastAPI applications with decorator-based event handlers

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

FastAPI MQTT

FastAPI-MQTT provides MQTT integration for FastAPI applications with decorator-based event handlers. Built as a wrapper around the gmqtt module, it supports MQTT version 5.0 protocol and enables machine-to-machine communication in low bandwidth environments with decorator-based callback methods and publish/subscribe functionality.

Package Information

  • Package Name: fastapi-mqtt
  • Language: Python
  • Installation: pip install fastapi-mqtt

Core Imports

from fastapi_mqtt import FastMQTT, MQTTConfig

Full import with MQTTClient:

from fastapi_mqtt import FastMQTT, MQTTConfig, MQTTClient

For handler type hints:

from gmqtt import Client as MQTTClient

Basic Usage

from contextlib import asynccontextmanager
from typing import Any
from fastapi import FastAPI
from gmqtt import Client as MQTTClient
from fastapi_mqtt import FastMQTT, MQTTConfig

# Configure MQTT connection
mqtt_config = MQTTConfig(
    host="mqtt.broker.com",
    port=1883,
    keepalive=60,
    username="user",
    password="password"
)

# Create FastMQTT instance
fast_mqtt = FastMQTT(config=mqtt_config)

# FastAPI lifespan management
@asynccontextmanager
async def lifespan(app: FastAPI):
    await fast_mqtt.mqtt_startup()
    yield
    await fast_mqtt.mqtt_shutdown()

app = FastAPI(lifespan=lifespan)

# Connection handler
@fast_mqtt.on_connect()
def connect(client: MQTTClient, flags: int, rc: int, properties: Any):
    client.subscribe("/mqtt/data")
    print("Connected to MQTT broker")

# Topic-specific subscription
@fast_mqtt.subscribe("sensors/+/temperature", qos=1)
async def temperature_handler(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any):
    temp_data = payload.decode()
    print(f"Temperature from {topic}: {temp_data}")

# Global message handler
@fast_mqtt.on_message()
async def message_handler(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any):
    print(f"Message from {topic}: {payload.decode()}")

# API endpoint to publish messages
@app.get("/publish/{topic}/{message}")
async def publish_message(topic: str, message: str):
    fast_mqtt.publish(topic, message)
    return {"status": "published", "topic": topic, "message": message}

Architecture

FastAPI-MQTT follows a decorator-based pattern for event handling:

  • FastMQTT: Main client class managing MQTT connections and subscriptions
  • MQTTConfig: Pydantic-based configuration for connection parameters
  • Decorators: Event handlers for connection, subscription, and message events
  • Handler Management: Internal system for organizing user-defined callbacks
  • Topic Matching: Built-in support for MQTT wildcards (+ and #) and shared subscriptions

Capabilities

Configuration

MQTT connection configuration using Pydantic BaseModel with support for authentication, SSL, reconnection, and last will messages.

class MQTTConfig:
    """
    MQTT connection configuration.
    
    Parameters:
    - host: MQTT broker hostname (default: "localhost")
    - port: MQTT broker port (default: 1883)
    - ssl: SSL/TLS configuration (bool or SSLContext, default: False)
    - keepalive: Keep-alive interval in seconds (default: 60)
    - username: Authentication username (optional)
    - password: Authentication password (optional)
    - version: MQTT protocol version (default: MQTTv50, range: 4-5)
    - reconnect_retries: Number of reconnection attempts (default: 1)
    - reconnect_delay: Delay between reconnections in seconds (default: 6)
    - will_message_topic: Last will message topic (optional)
    - will_message_payload: Last will message payload (optional)
    - will_delay_interval: Last will delay interval (optional)
    """
    host: str = "localhost"
    port: int = 1883
    ssl: Union[bool, SSLContext] = False
    keepalive: int = 60
    username: Optional[str] = None
    password: Optional[str] = None
    version: int = Field(default=MQTTv50, ge=4, le=5)
    reconnect_retries: Optional[int] = 1
    reconnect_delay: Optional[int] = 6
    will_message_topic: Optional[str] = None
    will_message_payload: Optional[str] = None
    will_delay_interval: Optional[int] = None

Client Initialization

Main FastMQTT client with customizable connection parameters and logging.

class FastMQTT:
    def __init__(
        self,
        config: MQTTConfig,
        *,
        client_id: Optional[str] = None,
        clean_session: bool = True,
        optimistic_acknowledgement: bool = True,
        mqtt_logger: Optional[logging.Logger] = None,
        **kwargs: Any,
    ) -> None:
        """
        Initialize FastMQTT client.
        
        Parameters:
        - config: MQTTConfig instance with connection parameters
        - client_id: Unique client identifier (auto-generated if None)
        - clean_session: Enable persistent session (default: True)
        - optimistic_acknowledgement: MQTT acknowledgement behavior (default: True)
        - mqtt_logger: Custom logger instance (optional)
        - **kwargs: Additional gmqtt client parameters
        """

Connection Management

Methods for establishing and managing MQTT broker connections with FastAPI lifecycle integration.

async def connection(self) -> None:
    """Establish connection to MQTT broker with authentication and configuration."""

async def mqtt_startup(self) -> None:
    """Initial connection method for FastAPI lifespan startup."""

async def mqtt_shutdown(self) -> None:
    """Final disconnection method for FastAPI lifespan shutdown."""

def init_app(self, fastapi_app) -> None:
    """Legacy method to add startup/shutdown event handlers (deprecated)."""

Message Publishing

Publish messages to MQTT topics with quality of service and retention options.

def publish(
    self,
    message_or_topic: str,
    payload: Any = None,
    qos: int = 0,
    retain: bool = False,
    **kwargs,
) -> None:
    """
    Publish message to MQTT broker.
    
    Parameters:
    - message_or_topic: Topic name
    - payload: Message payload (any serializable type)
    - qos: Quality of Service level (0, 1, or 2)
    - retain: Retain message on broker (default: False)
    - **kwargs: Additional publish parameters
    """

Topic Subscription

Subscribe to MQTT topics with pattern matching and quality of service configuration.

def subscribe(
    self,
    *topics,
    qos: int = 0,
    no_local: bool = False,
    retain_as_published: bool = False,
    retain_handling_options: int = 0,
    subscription_identifier: Any = None,
) -> Callable[..., Any]:
    """
    Decorator for subscribing to specific MQTT topics.
    
    Parameters:
    - *topics: Topic names (supports MQTT wildcards + and #)
    - qos: Quality of Service level (0, 1, or 2)
    - no_local: Don't receive own published messages (MQTT 5.0)
    - retain_as_published: Retain flag handling (MQTT 5.0)
    - retain_handling_options: Retained message behavior (MQTT 5.0)
    - subscription_identifier: Subscription identifier (MQTT 5.0)
    
    Returns:
    Decorator function for message handler
    """

def unsubscribe(self, topic: str, **kwargs):
    """
    Unsubscribe from MQTT topic.
    
    Parameters:
    - topic: Topic name to unsubscribe from
    - **kwargs: Additional unsubscribe parameters
    """

Event Handlers

Decorator methods for handling MQTT connection lifecycle and message events.

def on_connect(self) -> Callable[..., Any]:
    """
    Decorator for MQTT connection event handler.
    
    Handler signature:
    def handler(client: MQTTClient, flags: int, rc: int, properties: Any) -> Any
    """

def on_message(self) -> Callable[..., Any]:
    """
    Decorator for global MQTT message handler (all topics).
    
    Handler signature:
    async def handler(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any) -> Any
    """

def on_disconnect(self) -> Callable[..., Any]:
    """
    Decorator for MQTT disconnection event handler.
    
    Handler signature:
    def handler(client: MQTTClient, packet: bytes, exc: Optional[Exception]) -> Any
    """

def on_subscribe(self) -> Callable[..., Any]:
    """
    Decorator for MQTT subscription acknowledgment handler.
    
    Handler signature:
    def handler(client: MQTTClient, mid: int, qos: int, properties: Any) -> Any
    """

Topic Pattern Matching

Static method for matching MQTT topics against wildcard patterns.

@staticmethod
def match(topic: str, template: str) -> bool:
    """
    Match MQTT topic against template with wildcards.
    
    Parameters:
    - topic: Actual topic name
    - template: Template with wildcards (+ for single level, # for multi-level)
    
    Returns:
    True if topic matches template pattern
    
    Supports:
    - Single-level wildcard (+): matches one topic level
    - Multi-level wildcard (#): matches multiple topic levels
    - Shared subscriptions ($share/group/topic)
    """

Types

from ssl import SSLContext
from typing import Any, Awaitable, Callable, Optional, Union
from gmqtt import Client as MQTTClient
from gmqtt.mqtt.constants import MQTTv50
from pydantic import Field

# Handler type definitions
MQTTMessageHandler = Callable[[MQTTClient, str, bytes, int, Any], Awaitable[Any]]
MQTTConnectionHandler = Callable[[MQTTClient, int, int, Any], Any]
MQTTSubscriptionHandler = Callable[[MQTTClient, int, int, Any], Any]
MQTTDisconnectHandler = Callable[[MQTTClient, bytes, Optional[Exception]], Any]

# Configuration types
Union[bool, SSLContext]  # For ssl parameter
Field(default=MQTTv50, ge=4, le=5)  # For version parameter with validation

Error Handling

FastAPI-MQTT propagates exceptions from the underlying gmqtt client. Common exceptions include:

  • Connection errors: Network connectivity issues, invalid broker address
  • Authentication failures: Invalid username/password credentials
  • Protocol errors: MQTT protocol version mismatches, malformed messages
  • Timeout errors: Connection timeout, keep-alive timeout

Handle these in your application code:

try:
    await fast_mqtt.mqtt_startup()
except Exception as e:
    print(f"MQTT connection failed: {e}")
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/fastapi-mqtt@2.2.x
Publish Source
CLI
Badge
tessl/pypi-fastapi-mqtt badge