or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration-security.mdcore-client.mderror-handling.mdindex.mdmessage-handling.mdtopic-management.md

core-client.mddocs/

0

# Core Client Operations

1

2

Essential MQTT client functionality including connection management, publishing, and subscribing. The Client class serves as the main entry point for all MQTT operations using async context manager patterns.

3

4

## Capabilities

5

6

### Client Connection Management

7

8

The Client class provides async context manager support for automatic connection and disconnection management, eliminating the need for manual connection lifecycle handling.

9

10

```python { .api }

11

class Client:

12

def __init__(

13

self,

14

hostname: str,

15

port: int = 1883,

16

*,

17

username: str | None = None,

18

password: str | None = None,

19

logger: logging.Logger | None = None,

20

identifier: str | None = None,

21

queue_type: type[asyncio.Queue[Message]] | None = None,

22

protocol: ProtocolVersion | None = None,

23

will: Will | None = None,

24

clean_session: bool | None = None,

25

transport: Literal["tcp", "websockets", "unix"] = "tcp",

26

timeout: float | None = None,

27

keepalive: int = 60,

28

bind_address: str = "",

29

bind_port: int = 0,

30

clean_start: mqtt.CleanStartOption = mqtt.MQTT_CLEAN_START_FIRST_ONLY,

31

max_queued_incoming_messages: int | None = None,

32

max_queued_outgoing_messages: int | None = None,

33

max_inflight_messages: int | None = None,

34

max_concurrent_outgoing_calls: int | None = None,

35

properties: Properties | None = None,

36

tls_context: ssl.SSLContext | None = None,

37

tls_params: TLSParameters | None = None,

38

tls_insecure: bool | None = None,

39

proxy: ProxySettings | None = None,

40

socket_options: Iterable[SocketOption] | None = None,

41

websocket_path: str | None = None,

42

websocket_headers: WebSocketHeaders | None = None,

43

):

44

"""

45

Initialize MQTT client with connection parameters.

46

47

Args:

48

hostname (str): MQTT broker hostname or IP address

49

port (int): MQTT broker port, defaults to 1883

50

username (str, optional): Authentication username

51

password (str, optional): Authentication password

52

logger (logging.Logger, optional): Custom logger instance

53

identifier (str, optional): Client ID, auto-generated if None

54

queue_type (type, optional): Custom message queue class

55

protocol (ProtocolVersion, optional): MQTT protocol version

56

will (Will, optional): Last will and testament message

57

clean_session (bool, optional): Clean session flag for MQTT v3.1.1

58

transport (str): Transport protocol - "tcp", "websockets", or "unix"

59

timeout (float, optional): Default timeout for operations

60

keepalive (int): Keep-alive interval in seconds

61

bind_address (str): Local interface to bind to

62

bind_port (int): Local port to bind to

63

clean_start (mqtt.CleanStartOption): MQTT v5.0 clean start option

64

max_queued_incoming_messages (int, optional): Incoming message queue limit

65

max_queued_outgoing_messages (int, optional): Outgoing message queue limit

66

max_inflight_messages (int, optional): Maximum inflight messages

67

max_concurrent_outgoing_calls (int, optional): Concurrency limit

68

properties (Properties, optional): MQTT v5.0 connection properties

69

tls_context (ssl.SSLContext, optional): Pre-configured SSL context

70

tls_params (TLSParameters, optional): SSL/TLS configuration parameters

71

tls_insecure (bool, optional): Disable hostname verification

72

proxy (ProxySettings, optional): Proxy configuration

73

socket_options (Iterable, optional): Socket options

74

websocket_path (str, optional): WebSocket path for websocket transport

75

websocket_headers (WebSocketHeaders, optional): WebSocket headers

76

"""

77

78

async def __aenter__(self) -> Self:

79

"""

80

Connect to MQTT broker when entering async context.

81

82

Returns:

83

Self: The connected client instance

84

85

Raises:

86

MqttError: If connection fails

87

"""

88

89

async def __aexit__(

90

self,

91

exc_type: type[BaseException] | None,

92

exc: BaseException | None,

93

tb: TracebackType | None,

94

) -> None:

95

"""

96

Disconnect from MQTT broker when exiting async context.

97

98

Args:

99

exc_type: Exception type if context exited with exception

100

exc: Exception instance if context exited with exception

101

tb: Traceback if context exited with exception

102

"""

103

```

104

105

**Usage example:**

106

107

```python

108

import asyncio

109

from aiomqtt import Client

110

111

async def basic_connection():

112

# Automatic connection and disconnection

113

async with Client("test.mosquitto.org") as client:

114

print(f"Connected with client ID: {client.identifier}")

115

# Client automatically disconnects when exiting context

116

117

asyncio.run(basic_connection())

118

```

119

120

### Message Publishing

121

122

Publish messages to MQTT topics with support for quality of service, retained messages, and MQTT v5.0 properties.

123

124

```python { .api }

125

async def publish(

126

self,

127

/,

128

topic: str,

129

payload: PayloadType = None,

130

qos: int = 0,

131

retain: bool = False,

132

properties: Properties | None = None,

133

*args: Any,

134

timeout: float | None = None,

135

**kwargs: Any,

136

) -> None:

137

"""

138

Publish a message to an MQTT topic.

139

140

Args:

141

topic (str): Target topic for the message

142

payload (PayloadType, optional): Message payload, defaults to None

143

qos (int): Quality of service level (0, 1, or 2), defaults to 0

144

retain (bool): Whether to retain the message on the broker, defaults to False

145

properties (Properties, optional): MQTT v5.0 message properties

146

timeout (float, optional): Operation timeout, uses client default if None

147

148

Raises:

149

MqttError: If publish operation fails

150

MqttCodeError: If broker returns an error code

151

"""

152

```

153

154

**Usage examples:**

155

156

```python

157

import asyncio

158

from aiomqtt import Client

159

160

async def publish_examples():

161

async with Client("test.mosquitto.org") as client:

162

# Simple text message

163

await client.publish("sensors/temperature", "23.5")

164

165

# Binary payload

166

await client.publish("sensors/image", b"binary_image_data")

167

168

# Numeric payload

169

await client.publish("sensors/humidity", 65.2)

170

171

# QoS 1 with retain flag

172

await client.publish(

173

"status/online",

174

"connected",

175

qos=1,

176

retain=True

177

)

178

179

# With custom timeout

180

await client.publish(

181

"slow/topic",

182

"data",

183

timeout=10.0

184

)

185

186

asyncio.run(publish_examples())

187

```

188

189

### Topic Subscription

190

191

Subscribe to MQTT topics and wildcards with support for multiple QoS levels and subscription options.

192

193

```python { .api }

194

async def subscribe(

195

self,

196

/,

197

topic: SubscribeTopic,

198

qos: int = 0,

199

options: SubscribeOptions | None = None,

200

properties: Properties | None = None,

201

*args: Any,

202

timeout: float | None = None,

203

**kwargs: Any,

204

) -> tuple[int, ...] | list[ReasonCode]:

205

"""

206

Subscribe to one or more MQTT topics.

207

208

Args:

209

topic: Topic(s) to subscribe to - can be:

210

- str: Single topic or wildcard pattern

211

- Topic: Single topic object

212

- Wildcard: Single wildcard object

213

- list: Multiple topics with optional QoS specifications

214

qos (int): Quality of service level, defaults to 0

215

options (SubscribeOptions, optional): MQTT v5.0 subscription options

216

properties (Properties, optional): MQTT v5.0 subscription properties

217

timeout (float, optional): Operation timeout, uses client default if None

218

219

Returns:

220

tuple[int, ...] | list[ReasonCode]: Granted QoS levels or reason codes

221

222

Raises:

223

MqttError: If subscription operation fails

224

MqttCodeError: If broker returns an error code

225

"""

226

227

async def unsubscribe(

228

self,

229

topic: str | Topic | Wildcard | list[str | Topic | Wildcard],

230

properties: Properties | None = None,

231

timeout: float | None = None,

232

) -> None:

233

"""

234

Unsubscribe from one or more MQTT topics.

235

236

Args:

237

topic: Topic(s) to unsubscribe from

238

properties (Properties, optional): MQTT v5.0 properties

239

timeout (float, optional): Operation timeout, uses client default if None

240

241

Raises:

242

MqttError: If unsubscribe operation fails

243

"""

244

```

245

246

**Usage examples:**

247

248

```python

249

import asyncio

250

from aiomqtt import Client

251

252

async def subscription_examples():

253

async with Client("test.mosquitto.org") as client:

254

# Simple topic subscription

255

await client.subscribe("sensors/temperature")

256

257

# Wildcard subscriptions

258

await client.subscribe("sensors/+/temperature") # Single level wildcard

259

await client.subscribe("sensors/#") # Multi-level wildcard

260

261

# Multiple topics with different QoS

262

await client.subscribe([

263

("sensors/temperature", 0),

264

("sensors/humidity", 1),

265

("alerts/#", 2)

266

])

267

268

# QoS 1 subscription

269

await client.subscribe("important/data", qos=1)

270

271

# Unsubscribe

272

await client.unsubscribe("sensors/temperature")

273

await client.unsubscribe(["sensors/+/temp", "alerts/#"])

274

275

asyncio.run(subscription_examples())

276

```

277

278

### Message Reception

279

280

Access received messages through the async iterator interface provided by the messages property.

281

282

```python { .api }

283

@property

284

def messages(self) -> MessagesIterator:

285

"""

286

Get async iterator for received messages.

287

288

Returns:

289

MessagesIterator: Iterator for received messages

290

"""

291

292

class MessagesIterator:

293

def __aiter__(self) -> AsyncIterator[Message]:

294

"""Return async iterator."""

295

296

def __anext__(self) -> Message:

297

"""Get next message from queue."""

298

299

def __len__(self) -> int:

300

"""

301

Get number of queued messages.

302

303

Returns:

304

int: Number of messages in queue

305

"""

306

```

307

308

**Usage example:**

309

310

```python

311

import asyncio

312

from aiomqtt import Client

313

314

async def message_reception():

315

async with Client("test.mosquitto.org") as client:

316

await client.subscribe("sensors/#")

317

318

# Receive messages indefinitely

319

async for message in client.messages:

320

print(f"Topic: {message.topic}")

321

print(f"Payload: {message.payload}")

322

print(f"QoS: {message.qos}")

323

324

# Process specific topics

325

if message.topic.matches("sensors/temperature"):

326

temperature = float(message.payload)

327

print(f"Temperature: {temperature}°C")

328

329

# Check queue length

330

print(f"Messages in queue: {len(client.messages)}")

331

332

asyncio.run(message_reception())

333

```

334

335

### Client Properties

336

337

Access client configuration and status information.

338

339

```python { .api }

340

@property

341

def identifier(self) -> str:

342

"""

343

Get client identifier.

344

345

Returns:

346

str: MQTT client identifier

347

"""

348

349

@property

350

def pending_calls_threshold(self) -> int:

351

"""

352

Get warning threshold for pending calls.

353

354

Returns:

355

int: Threshold value for pending calls warning

356

"""

357

358

@property

359

def timeout(self) -> float:

360

"""

361

Get default timeout value.

362

363

Returns:

364

float: Default timeout in seconds

365

"""

366

```

367

368

## Type Definitions

369

370

```python { .api }

371

PayloadType = str | bytes | bytearray | int | float | None

372

373

SubscribeTopic = (

374

str

375

| tuple[str, SubscribeOptions]

376

| list[tuple[str, SubscribeOptions]]

377

| list[tuple[str, int]]

378

)

379

380

WebSocketHeaders = dict[str, str] | Callable[[dict[str, str]], dict[str, str]]

381

382

PahoSocket = socket.socket | ssl.SSLSocket | Any

383

384

SocketOption = tuple[int, int, int | bytes] | tuple[int, int, None, int]

385

```