or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-fastapi-mqtt

FastAPI-MQTT provides MQTT integration for FastAPI applications with decorator-based event handlers

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/fastapi-mqtt@2.2.x

To install, run

npx @tessl/cli install tessl/pypi-fastapi-mqtt@2.2.0

0

# FastAPI MQTT

1

2

FastAPI-MQTT provides MQTT integration for FastAPI applications with decorator-based event handlers. Built as a wrapper around the gmqtt module, it supports MQTT version 5.0 protocol and enables machine-to-machine communication in low bandwidth environments with decorator-based callback methods and publish/subscribe functionality.

3

4

## Package Information

5

6

- **Package Name**: fastapi-mqtt

7

- **Language**: Python

8

- **Installation**: `pip install fastapi-mqtt`

9

10

## Core Imports

11

12

```python

13

from fastapi_mqtt import FastMQTT, MQTTConfig

14

```

15

16

Full import with MQTTClient:

17

18

```python

19

from fastapi_mqtt import FastMQTT, MQTTConfig, MQTTClient

20

```

21

22

For handler type hints:

23

24

```python

25

from gmqtt import Client as MQTTClient

26

```

27

28

## Basic Usage

29

30

```python

31

from contextlib import asynccontextmanager

32

from typing import Any

33

from fastapi import FastAPI

34

from gmqtt import Client as MQTTClient

35

from fastapi_mqtt import FastMQTT, MQTTConfig

36

37

# Configure MQTT connection

38

mqtt_config = MQTTConfig(

39

host="mqtt.broker.com",

40

port=1883,

41

keepalive=60,

42

username="user",

43

password="password"

44

)

45

46

# Create FastMQTT instance

47

fast_mqtt = FastMQTT(config=mqtt_config)

48

49

# FastAPI lifespan management

50

@asynccontextmanager

51

async def lifespan(app: FastAPI):

52

await fast_mqtt.mqtt_startup()

53

yield

54

await fast_mqtt.mqtt_shutdown()

55

56

app = FastAPI(lifespan=lifespan)

57

58

# Connection handler

59

@fast_mqtt.on_connect()

60

def connect(client: MQTTClient, flags: int, rc: int, properties: Any):

61

client.subscribe("/mqtt/data")

62

print("Connected to MQTT broker")

63

64

# Topic-specific subscription

65

@fast_mqtt.subscribe("sensors/+/temperature", qos=1)

66

async def temperature_handler(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any):

67

temp_data = payload.decode()

68

print(f"Temperature from {topic}: {temp_data}")

69

70

# Global message handler

71

@fast_mqtt.on_message()

72

async def message_handler(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any):

73

print(f"Message from {topic}: {payload.decode()}")

74

75

# API endpoint to publish messages

76

@app.get("/publish/{topic}/{message}")

77

async def publish_message(topic: str, message: str):

78

fast_mqtt.publish(topic, message)

79

return {"status": "published", "topic": topic, "message": message}

80

```

81

82

## Architecture

83

84

FastAPI-MQTT follows a decorator-based pattern for event handling:

85

86

- **FastMQTT**: Main client class managing MQTT connections and subscriptions

87

- **MQTTConfig**: Pydantic-based configuration for connection parameters

88

- **Decorators**: Event handlers for connection, subscription, and message events

89

- **Handler Management**: Internal system for organizing user-defined callbacks

90

- **Topic Matching**: Built-in support for MQTT wildcards (+ and #) and shared subscriptions

91

92

## Capabilities

93

94

### Configuration

95

96

MQTT connection configuration using Pydantic BaseModel with support for authentication, SSL, reconnection, and last will messages.

97

98

```python { .api }

99

class MQTTConfig:

100

"""

101

MQTT connection configuration.

102

103

Parameters:

104

- host: MQTT broker hostname (default: "localhost")

105

- port: MQTT broker port (default: 1883)

106

- ssl: SSL/TLS configuration (bool or SSLContext, default: False)

107

- keepalive: Keep-alive interval in seconds (default: 60)

108

- username: Authentication username (optional)

109

- password: Authentication password (optional)

110

- version: MQTT protocol version (default: MQTTv50, range: 4-5)

111

- reconnect_retries: Number of reconnection attempts (default: 1)

112

- reconnect_delay: Delay between reconnections in seconds (default: 6)

113

- will_message_topic: Last will message topic (optional)

114

- will_message_payload: Last will message payload (optional)

115

- will_delay_interval: Last will delay interval (optional)

116

"""

117

host: str = "localhost"

118

port: int = 1883

119

ssl: Union[bool, SSLContext] = False

120

keepalive: int = 60

121

username: Optional[str] = None

122

password: Optional[str] = None

123

version: int = Field(default=MQTTv50, ge=4, le=5)

124

reconnect_retries: Optional[int] = 1

125

reconnect_delay: Optional[int] = 6

126

will_message_topic: Optional[str] = None

127

will_message_payload: Optional[str] = None

128

will_delay_interval: Optional[int] = None

129

```

130

131

### Client Initialization

132

133

Main FastMQTT client with customizable connection parameters and logging.

134

135

```python { .api }

136

class FastMQTT:

137

def __init__(

138

self,

139

config: MQTTConfig,

140

*,

141

client_id: Optional[str] = None,

142

clean_session: bool = True,

143

optimistic_acknowledgement: bool = True,

144

mqtt_logger: Optional[logging.Logger] = None,

145

**kwargs: Any,

146

) -> None:

147

"""

148

Initialize FastMQTT client.

149

150

Parameters:

151

- config: MQTTConfig instance with connection parameters

152

- client_id: Unique client identifier (auto-generated if None)

153

- clean_session: Enable persistent session (default: True)

154

- optimistic_acknowledgement: MQTT acknowledgement behavior (default: True)

155

- mqtt_logger: Custom logger instance (optional)

156

- **kwargs: Additional gmqtt client parameters

157

"""

158

```

159

160

### Connection Management

161

162

Methods for establishing and managing MQTT broker connections with FastAPI lifecycle integration.

163

164

```python { .api }

165

async def connection(self) -> None:

166

"""Establish connection to MQTT broker with authentication and configuration."""

167

168

async def mqtt_startup(self) -> None:

169

"""Initial connection method for FastAPI lifespan startup."""

170

171

async def mqtt_shutdown(self) -> None:

172

"""Final disconnection method for FastAPI lifespan shutdown."""

173

174

def init_app(self, fastapi_app) -> None:

175

"""Legacy method to add startup/shutdown event handlers (deprecated)."""

176

```

177

178

### Message Publishing

179

180

Publish messages to MQTT topics with quality of service and retention options.

181

182

```python { .api }

183

def publish(

184

self,

185

message_or_topic: str,

186

payload: Any = None,

187

qos: int = 0,

188

retain: bool = False,

189

**kwargs,

190

) -> None:

191

"""

192

Publish message to MQTT broker.

193

194

Parameters:

195

- message_or_topic: Topic name

196

- payload: Message payload (any serializable type)

197

- qos: Quality of Service level (0, 1, or 2)

198

- retain: Retain message on broker (default: False)

199

- **kwargs: Additional publish parameters

200

"""

201

```

202

203

### Topic Subscription

204

205

Subscribe to MQTT topics with pattern matching and quality of service configuration.

206

207

```python { .api }

208

def subscribe(

209

self,

210

*topics,

211

qos: int = 0,

212

no_local: bool = False,

213

retain_as_published: bool = False,

214

retain_handling_options: int = 0,

215

subscription_identifier: Any = None,

216

) -> Callable[..., Any]:

217

"""

218

Decorator for subscribing to specific MQTT topics.

219

220

Parameters:

221

- *topics: Topic names (supports MQTT wildcards + and #)

222

- qos: Quality of Service level (0, 1, or 2)

223

- no_local: Don't receive own published messages (MQTT 5.0)

224

- retain_as_published: Retain flag handling (MQTT 5.0)

225

- retain_handling_options: Retained message behavior (MQTT 5.0)

226

- subscription_identifier: Subscription identifier (MQTT 5.0)

227

228

Returns:

229

Decorator function for message handler

230

"""

231

232

def unsubscribe(self, topic: str, **kwargs):

233

"""

234

Unsubscribe from MQTT topic.

235

236

Parameters:

237

- topic: Topic name to unsubscribe from

238

- **kwargs: Additional unsubscribe parameters

239

"""

240

```

241

242

### Event Handlers

243

244

Decorator methods for handling MQTT connection lifecycle and message events.

245

246

```python { .api }

247

def on_connect(self) -> Callable[..., Any]:

248

"""

249

Decorator for MQTT connection event handler.

250

251

Handler signature:

252

def handler(client: MQTTClient, flags: int, rc: int, properties: Any) -> Any

253

"""

254

255

def on_message(self) -> Callable[..., Any]:

256

"""

257

Decorator for global MQTT message handler (all topics).

258

259

Handler signature:

260

async def handler(client: MQTTClient, topic: str, payload: bytes, qos: int, properties: Any) -> Any

261

"""

262

263

def on_disconnect(self) -> Callable[..., Any]:

264

"""

265

Decorator for MQTT disconnection event handler.

266

267

Handler signature:

268

def handler(client: MQTTClient, packet: bytes, exc: Optional[Exception]) -> Any

269

"""

270

271

def on_subscribe(self) -> Callable[..., Any]:

272

"""

273

Decorator for MQTT subscription acknowledgment handler.

274

275

Handler signature:

276

def handler(client: MQTTClient, mid: int, qos: int, properties: Any) -> Any

277

"""

278

```

279

280

### Topic Pattern Matching

281

282

Static method for matching MQTT topics against wildcard patterns.

283

284

```python { .api }

285

@staticmethod

286

def match(topic: str, template: str) -> bool:

287

"""

288

Match MQTT topic against template with wildcards.

289

290

Parameters:

291

- topic: Actual topic name

292

- template: Template with wildcards (+ for single level, # for multi-level)

293

294

Returns:

295

True if topic matches template pattern

296

297

Supports:

298

- Single-level wildcard (+): matches one topic level

299

- Multi-level wildcard (#): matches multiple topic levels

300

- Shared subscriptions ($share/group/topic)

301

"""

302

```

303

304

## Types

305

306

```python { .api }

307

from ssl import SSLContext

308

from typing import Any, Awaitable, Callable, Optional, Union

309

from gmqtt import Client as MQTTClient

310

from gmqtt.mqtt.constants import MQTTv50

311

from pydantic import Field

312

313

# Handler type definitions

314

MQTTMessageHandler = Callable[[MQTTClient, str, bytes, int, Any], Awaitable[Any]]

315

MQTTConnectionHandler = Callable[[MQTTClient, int, int, Any], Any]

316

MQTTSubscriptionHandler = Callable[[MQTTClient, int, int, Any], Any]

317

MQTTDisconnectHandler = Callable[[MQTTClient, bytes, Optional[Exception]], Any]

318

319

# Configuration types

320

Union[bool, SSLContext] # For ssl parameter

321

Field(default=MQTTv50, ge=4, le=5) # For version parameter with validation

322

```

323

324

## Error Handling

325

326

FastAPI-MQTT propagates exceptions from the underlying gmqtt client. Common exceptions include:

327

328

- **Connection errors**: Network connectivity issues, invalid broker address

329

- **Authentication failures**: Invalid username/password credentials

330

- **Protocol errors**: MQTT protocol version mismatches, malformed messages

331

- **Timeout errors**: Connection timeout, keep-alive timeout

332

333

Handle these in your application code:

334

335

```python

336

try:

337

await fast_mqtt.mqtt_startup()

338

except Exception as e:

339

print(f"MQTT connection failed: {e}")

340

```