FastAPI-MQTT provides MQTT integration for FastAPI applications with decorator-based event handlers
npx @tessl/cli install tessl/pypi-fastapi-mqtt@2.2.00
# FastAPI MQTT
1
2
FastAPI-MQTT provides MQTT integration for FastAPI applications with decorator-based event handlers. Built as a wrapper around the gmqtt module, it supports MQTT version 5.0 protocol and enables machine-to-machine communication in low bandwidth environments with decorator-based callback methods and publish/subscribe functionality.
3
4
## Package Information
5
6
- **Package Name**: fastapi-mqtt
7
- **Language**: Python
8
- **Installation**: `pip install fastapi-mqtt`
9
10
## Core Imports
11
12
```python
13
from fastapi_mqtt import FastMQTT, MQTTConfig
14
```
15
16
Full import with MQTTClient:
17
18
```python
19
from fastapi_mqtt import FastMQTT, MQTTConfig, MQTTClient
20
```
21
22
For handler type hints:
23
24
```python
25
from gmqtt import Client as MQTTClient
26
```
27
28
## Basic Usage
29
30
```python
31
from contextlib import asynccontextmanager
32
from typing import Any
33
from fastapi import FastAPI
34
from gmqtt import Client as MQTTClient
35
from fastapi_mqtt import FastMQTT, MQTTConfig
36
37
# Configure MQTT connection
38
mqtt_config = MQTTConfig(
39
host="mqtt.broker.com",
40
port=1883,
41
keepalive=60,
42
username="user",
43
password="password"
44
)
45
46
# Create FastMQTT instance
47
fast_mqtt = FastMQTT(config=mqtt_config)
48
49
# FastAPI lifespan management
50
@asynccontextmanager
51
async def lifespan(app: FastAPI):
52
await fast_mqtt.mqtt_startup()
53
yield
54
await fast_mqtt.mqtt_shutdown()
55
56
app = FastAPI(lifespan=lifespan)
57
58
# Connection handler
59
@fast_mqtt.on_connect()
60
def connect(client: MQTTClient, flags: int, rc: int, properties: Any):
61
client.subscribe("/mqtt/data")
62
print("Connected to MQTT broker")
63
64
# Topic-specific subscription
65
@fast_mqtt.subscribe("sensors/+/temperature", qos=1)
66
async def temperature_handler(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any):
67
temp_data = payload.decode()
68
print(f"Temperature from {topic}: {temp_data}")
69
70
# Global message handler
71
@fast_mqtt.on_message()
72
async def message_handler(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any):
73
print(f"Message from {topic}: {payload.decode()}")
74
75
# API endpoint to publish messages
76
@app.get("/publish/{topic}/{message}")
77
async def publish_message(topic: str, message: str):
78
fast_mqtt.publish(topic, message)
79
return {"status": "published", "topic": topic, "message": message}
80
```
81
82
## Architecture
83
84
FastAPI-MQTT follows a decorator-based pattern for event handling:
85
86
- **FastMQTT**: Main client class managing MQTT connections and subscriptions
87
- **MQTTConfig**: Pydantic-based configuration for connection parameters
88
- **Decorators**: Event handlers for connection, subscription, and message events
89
- **Handler Management**: Internal system for organizing user-defined callbacks
90
- **Topic Matching**: Built-in support for MQTT wildcards (+ and #) and shared subscriptions
91
92
## Capabilities
93
94
### Configuration
95
96
MQTT connection configuration using Pydantic BaseModel with support for authentication, SSL, reconnection, and last will messages.
97
98
```python { .api }
99
class MQTTConfig:
100
"""
101
MQTT connection configuration.
102
103
Parameters:
104
- host: MQTT broker hostname (default: "localhost")
105
- port: MQTT broker port (default: 1883)
106
- ssl: SSL/TLS configuration (bool or SSLContext, default: False)
107
- keepalive: Keep-alive interval in seconds (default: 60)
108
- username: Authentication username (optional)
109
- password: Authentication password (optional)
110
- version: MQTT protocol version (default: MQTTv50, range: 4-5)
111
- reconnect_retries: Number of reconnection attempts (default: 1)
112
- reconnect_delay: Delay between reconnections in seconds (default: 6)
113
- will_message_topic: Last will message topic (optional)
114
- will_message_payload: Last will message payload (optional)
115
- will_delay_interval: Last will delay interval (optional)
116
"""
117
host: str = "localhost"
118
port: int = 1883
119
ssl: Union[bool, SSLContext] = False
120
keepalive: int = 60
121
username: Optional[str] = None
122
password: Optional[str] = None
123
version: int = Field(default=MQTTv50, ge=4, le=5)
124
reconnect_retries: Optional[int] = 1
125
reconnect_delay: Optional[int] = 6
126
will_message_topic: Optional[str] = None
127
will_message_payload: Optional[str] = None
128
will_delay_interval: Optional[int] = None
129
```
130
131
### Client Initialization
132
133
Main FastMQTT client with customizable connection parameters and logging.
134
135
```python { .api }
136
class FastMQTT:
137
def __init__(
138
self,
139
config: MQTTConfig,
140
*,
141
client_id: Optional[str] = None,
142
clean_session: bool = True,
143
optimistic_acknowledgement: bool = True,
144
mqtt_logger: Optional[logging.Logger] = None,
145
**kwargs: Any,
146
) -> None:
147
"""
148
Initialize FastMQTT client.
149
150
Parameters:
151
- config: MQTTConfig instance with connection parameters
152
- client_id: Unique client identifier (auto-generated if None)
153
- clean_session: Enable persistent session (default: True)
154
- optimistic_acknowledgement: MQTT acknowledgement behavior (default: True)
155
- mqtt_logger: Custom logger instance (optional)
156
- **kwargs: Additional gmqtt client parameters
157
"""
158
```
159
160
### Connection Management
161
162
Methods for establishing and managing MQTT broker connections with FastAPI lifecycle integration.
163
164
```python { .api }
165
async def connection(self) -> None:
166
"""Establish connection to MQTT broker with authentication and configuration."""
167
168
async def mqtt_startup(self) -> None:
169
"""Initial connection method for FastAPI lifespan startup."""
170
171
async def mqtt_shutdown(self) -> None:
172
"""Final disconnection method for FastAPI lifespan shutdown."""
173
174
def init_app(self, fastapi_app) -> None:
175
"""Legacy method to add startup/shutdown event handlers (deprecated)."""
176
```
177
178
### Message Publishing
179
180
Publish messages to MQTT topics with quality of service and retention options.
181
182
```python { .api }
183
def publish(
184
self,
185
message_or_topic: str,
186
payload: Any = None,
187
qos: int = 0,
188
retain: bool = False,
189
**kwargs,
190
) -> None:
191
"""
192
Publish message to MQTT broker.
193
194
Parameters:
195
- message_or_topic: Topic name
196
- payload: Message payload (any serializable type)
197
- qos: Quality of Service level (0, 1, or 2)
198
- retain: Retain message on broker (default: False)
199
- **kwargs: Additional publish parameters
200
"""
201
```
202
203
### Topic Subscription
204
205
Subscribe to MQTT topics with pattern matching and quality of service configuration.
206
207
```python { .api }
208
def subscribe(
209
self,
210
*topics,
211
qos: int = 0,
212
no_local: bool = False,
213
retain_as_published: bool = False,
214
retain_handling_options: int = 0,
215
subscription_identifier: Any = None,
216
) -> Callable[..., Any]:
217
"""
218
Decorator for subscribing to specific MQTT topics.
219
220
Parameters:
221
- *topics: Topic names (supports MQTT wildcards + and #)
222
- qos: Quality of Service level (0, 1, or 2)
223
- no_local: Don't receive own published messages (MQTT 5.0)
224
- retain_as_published: Retain flag handling (MQTT 5.0)
225
- retain_handling_options: Retained message behavior (MQTT 5.0)
226
- subscription_identifier: Subscription identifier (MQTT 5.0)
227
228
Returns:
229
Decorator function for message handler
230
"""
231
232
def unsubscribe(self, topic: str, **kwargs):
233
"""
234
Unsubscribe from MQTT topic.
235
236
Parameters:
237
- topic: Topic name to unsubscribe from
238
- **kwargs: Additional unsubscribe parameters
239
"""
240
```
241
242
### Event Handlers
243
244
Decorator methods for handling MQTT connection lifecycle and message events.
245
246
```python { .api }
247
def on_connect(self) -> Callable[..., Any]:
248
"""
249
Decorator for MQTT connection event handler.
250
251
Handler signature:
252
def handler(client: MQTTClient, flags: int, rc: int, properties: Any) -> Any
253
"""
254
255
def on_message(self) -> Callable[..., Any]:
256
"""
257
Decorator for global MQTT message handler (all topics).
258
259
Handler signature:
260
async def handler(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any) -> Any
261
"""
262
263
def on_disconnect(self) -> Callable[..., Any]:
264
"""
265
Decorator for MQTT disconnection event handler.
266
267
Handler signature:
268
def handler(client: MQTTClient, packet: bytes, exc: Optional[Exception]) -> Any
269
"""
270
271
def on_subscribe(self) -> Callable[..., Any]:
272
"""
273
Decorator for MQTT subscription acknowledgment handler.
274
275
Handler signature:
276
def handler(client: MQTTClient, mid: int, qos: int, properties: Any) -> Any
277
"""
278
```
279
280
### Topic Pattern Matching
281
282
Static method for matching MQTT topics against wildcard patterns.
283
284
```python { .api }
285
@staticmethod
286
def match(topic: str, template: str) -> bool:
287
"""
288
Match MQTT topic against template with wildcards.
289
290
Parameters:
291
- topic: Actual topic name
292
- template: Template with wildcards (+ for single level, # for multi-level)
293
294
Returns:
295
True if topic matches template pattern
296
297
Supports:
298
- Single-level wildcard (+): matches one topic level
299
- Multi-level wildcard (#): matches multiple topic levels
300
- Shared subscriptions ($share/group/topic)
301
"""
302
```
303
304
## Types
305
306
```python { .api }
307
from ssl import SSLContext
308
from typing import Any, Awaitable, Callable, Optional, Union
309
from gmqtt import Client as MQTTClient
310
from gmqtt.mqtt.constants import MQTTv50
311
from pydantic import Field
312
313
# Handler type definitions
314
MQTTMessageHandler = Callable[[MQTTClient, str, bytes, int, Any], Awaitable[Any]]
315
MQTTConnectionHandler = Callable[[MQTTClient, int, int, Any], Any]
316
MQTTSubscriptionHandler = Callable[[MQTTClient, int, int, Any], Any]
317
MQTTDisconnectHandler = Callable[[MQTTClient, bytes, Optional[Exception]], Any]
318
319
# Configuration types
320
Union[bool, SSLContext] # For ssl parameter
321
Field(default=MQTTv50, ge=4, le=5) # For version parameter with validation
322
```
323
324
## Error Handling
325
326
FastAPI-MQTT propagates exceptions from the underlying gmqtt client. Common exceptions include:
327
328
- **Connection errors**: Network connectivity issues, invalid broker address
329
- **Authentication failures**: Invalid username/password credentials
330
- **Protocol errors**: MQTT protocol version mismatches, malformed messages
331
- **Timeout errors**: Connection timeout, keep-alive timeout
332
333
Handle these in your application code:
334
335
```python
336
try:
337
await fast_mqtt.mqtt_startup()
338
except Exception as e:
339
print(f"MQTT connection failed: {e}")
340
```