or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication-security.mdchannel-operations.mdconnection-adapters.mdconnection-management.mdexception-handling.mdindex.mdmessage-properties-types.md

connection-management.mddocs/

0

# Connection Management

1

2

Comprehensive connection configuration and management with multiple adapter types supporting different networking approaches for connecting to RabbitMQ brokers.

3

4

## Capabilities

5

6

### Connection Parameters

7

8

Configure connection settings including host, port, credentials, timeouts, and protocol options.

9

10

```python { .api }

11

class ConnectionParameters:

12

"""Connection parameters for AMQP connections."""

13

14

def __init__(self, host='localhost', port=5672, virtual_host='/',

15

credentials=None, channel_max=65535, frame_max=131072,

16

heartbeat=None, ssl_options=None, connection_attempts=1,

17

retry_delay=2.0, socket_timeout=10.0, stack_timeout=15.0,

18

locale='en_US', blocked_connection_timeout=None,

19

client_properties=None, tcp_options=None):

20

"""

21

Parameters:

22

- host (str): RabbitMQ server hostname

23

- port (int): RabbitMQ server port (default: 5672)

24

- virtual_host (str): Virtual host (default: '/')

25

- credentials (Credentials): Authentication credentials

26

- channel_max (int): Maximum number of channels (default: 65535)

27

- frame_max (int): Maximum frame size in bytes (default: 131072)

28

- heartbeat (int): Heartbeat interval in seconds

29

- ssl_options (SSLOptions): SSL configuration

30

- connection_attempts (int): Number of connection attempts (default: 1)

31

- retry_delay (float): Delay between attempts in seconds (default: 2.0)

32

- socket_timeout (float): Socket connection timeout (default: 10.0)

33

- stack_timeout (float): Full stack timeout (default: 15.0)

34

- locale (str): Connection locale (default: 'en_US')

35

- blocked_connection_timeout (float): Blocked connection timeout

36

- client_properties (dict): Client identification properties

37

- tcp_options (dict): TCP socket options

38

"""

39

40

# Properties for all parameters

41

@property

42

def host(self) -> str: ...

43

44

@host.setter

45

def host(self, value: str): ...

46

47

@property

48

def port(self) -> int: ...

49

50

@port.setter

51

def port(self, value: int): ...

52

53

@property

54

def virtual_host(self) -> str: ...

55

56

@virtual_host.setter

57

def virtual_host(self, value: str): ...

58

59

@property

60

def credentials(self): ...

61

62

@credentials.setter

63

def credentials(self, value): ...

64

65

@property

66

def channel_max(self) -> int: ...

67

68

@channel_max.setter

69

def channel_max(self, value: int): ...

70

71

@property

72

def frame_max(self) -> int: ...

73

74

@frame_max.setter

75

def frame_max(self, value: int): ...

76

77

@property

78

def heartbeat(self) -> int: ...

79

80

@heartbeat.setter

81

def heartbeat(self, value: int): ...

82

83

@property

84

def ssl_options(self): ...

85

86

@ssl_options.setter

87

def ssl_options(self, value): ...

88

89

@property

90

def connection_attempts(self) -> int: ...

91

92

@connection_attempts.setter

93

def connection_attempts(self, value: int): ...

94

95

@property

96

def retry_delay(self) -> float: ...

97

98

@retry_delay.setter

99

def retry_delay(self, value: float): ...

100

101

@property

102

def socket_timeout(self) -> float: ...

103

104

@socket_timeout.setter

105

def socket_timeout(self, value: float): ...

106

107

@property

108

def stack_timeout(self) -> float: ...

109

110

@stack_timeout.setter

111

def stack_timeout(self, value: float): ...

112

113

@property

114

def locale(self) -> str: ...

115

116

@locale.setter

117

def locale(self, value: str): ...

118

119

@property

120

def blocked_connection_timeout(self) -> float: ...

121

122

@blocked_connection_timeout.setter

123

def blocked_connection_timeout(self, value: float): ...

124

125

@property

126

def client_properties(self) -> dict: ...

127

128

@client_properties.setter

129

def client_properties(self, value: dict): ...

130

131

@property

132

def tcp_options(self) -> dict: ...

133

134

@tcp_options.setter

135

def tcp_options(self, value: dict): ...

136

```

137

138

### URL-Based Parameters

139

140

Create connection parameters from AMQP URLs with automatic parsing of connection details.

141

142

```python { .api }

143

class URLParameters(ConnectionParameters):

144

"""Connection parameters from AMQP URL."""

145

146

def __init__(self, url):

147

"""

148

Create connection parameters from AMQP URL.

149

150

Parameters:

151

- url (str): AMQP URL in format: amqp://username:password@host:port/<virtual_host>[?query-string]

152

153

URL query parameters:

154

- channel_max: Maximum number of channels

155

- client_properties: Client properties as JSON

156

- connection_attempts: Number of connection attempts

157

- frame_max: Maximum frame size

158

- heartbeat: Heartbeat interval

159

- locale: Connection locale

160

- ssl_options: SSL options as JSON

161

- retry_delay: Retry delay in seconds

162

- socket_timeout: Socket timeout in seconds

163

- stack_timeout: Stack timeout in seconds

164

- blocked_connection_timeout: Blocked connection timeout

165

- tcp_options: TCP options as JSON

166

"""

167

```

168

169

### Blocking Connection

170

171

Synchronous connection adapter for simple blocking operations.

172

173

```python { .api }

174

class BlockingConnection:

175

"""Synchronous connection to RabbitMQ."""

176

177

def __init__(self, parameters):

178

"""

179

Create blocking connection.

180

181

Parameters:

182

- parameters (ConnectionParameters or URLParameters): Connection configuration

183

"""

184

185

def channel(self, channel_number=None):

186

"""

187

Create a new channel.

188

189

Parameters:

190

- channel_number (int, optional): Specific channel number to use

191

192

Returns:

193

- BlockingChannel: New channel instance

194

"""

195

196

def close(self):

197

"""Close the connection."""

198

199

def process_data_events(self, time_limit=0):

200

"""

201

Process pending data events.

202

203

Parameters:

204

- time_limit (float): Maximum time to process events (default: 0 for non-blocking)

205

"""

206

207

def sleep(self, duration):

208

"""

209

Sleep while processing connection events.

210

211

Parameters:

212

- duration (float): Sleep duration in seconds

213

"""

214

215

def add_callback_threadsafe(self, callback):

216

"""

217

Add callback to be executed in connection thread.

218

219

Parameters:

220

- callback (callable): Callback function to execute

221

"""

222

223

def call_later(self, delay, callback, *args):

224

"""

225

Schedule callback execution after delay.

226

227

Parameters:

228

- delay (float): Delay in seconds

229

- callback (callable): Callback function

230

- *args: Arguments for callback

231

232

Returns:

233

- object: Handle for canceling the call

234

"""

235

236

def remove_timeout(self, timeout_id):

237

"""

238

Remove scheduled timeout.

239

240

Parameters:

241

- timeout_id: Timeout handle to remove

242

"""

243

244

def update_secret(self, new_secret, reason):

245

"""

246

Update connection credentials.

247

248

Parameters:

249

- new_secret (str): New password/secret

250

- reason (str): Reason for update

251

"""

252

253

def add_on_connection_blocked_callback(self, callback):

254

"""

255

Add callback for connection blocked events.

256

257

Parameters:

258

- callback (callable): Callback function receiving (connection, method)

259

"""

260

261

def add_on_connection_unblocked_callback(self, callback):

262

"""

263

Add callback for connection unblocked events.

264

265

Parameters:

266

- callback (callable): Callback function receiving (connection, method)

267

"""

268

269

# Connection state properties

270

@property

271

def is_closed(self) -> bool:

272

"""True if connection is closed."""

273

274

@property

275

def is_open(self) -> bool:

276

"""True if connection is open."""

277

278

# Server capability properties

279

@property

280

def basic_nack_supported(self) -> bool:

281

"""True if server supports basic.nack."""

282

283

@property

284

def consumer_cancel_notify_supported(self) -> bool:

285

"""True if server supports consumer cancel notifications."""

286

287

@property

288

def exchange_exchange_bindings_supported(self) -> bool:

289

"""True if server supports exchange-to-exchange bindings."""

290

291

@property

292

def publisher_confirms_supported(self) -> bool:

293

"""True if server supports publisher confirms."""

294

```

295

296

### Select Connection

297

298

Event-driven connection adapter using select/poll/epoll for asynchronous operations.

299

300

```python { .api }

301

class SelectConnection:

302

"""Event-driven connection using select/poll/epoll."""

303

304

def __init__(self, parameters, on_open_callback=None, on_open_error_callback=None,

305

on_close_callback=None, ioloop=None):

306

"""

307

Create select-based connection.

308

309

Parameters:

310

- parameters (ConnectionParameters): Connection configuration

311

- on_open_callback (callable): Called when connection opens

312

- on_open_error_callback (callable): Called on connection error

313

- on_close_callback (callable): Called when connection closes

314

- ioloop (IOLoop): Event loop instance (creates default if None)

315

"""

316

317

def channel(self, on_open_callback, channel_number=None):

318

"""

319

Create a new channel asynchronously.

320

321

Parameters:

322

- on_open_callback (callable): Called when channel opens

323

- channel_number (int, optional): Specific channel number

324

"""

325

326

def close(self, reply_code=200, reply_text='Normal shutdown'):

327

"""

328

Close connection asynchronously.

329

330

Parameters:

331

- reply_code (int): AMQP reply code (default: 200)

332

- reply_text (str): Human-readable close reason

333

"""

334

335

def ioloop(self):

336

"""

337

Get the IOLoop instance.

338

339

Returns:

340

- IOLoop: Event loop instance

341

"""

342

```

343

344

### Connection Workflow

345

346

Default AMQP connection establishment workflow with retry and timeout handling.

347

348

```python { .api }

349

class AMQPConnectionWorkflow:

350

"""Default connection establishment workflow."""

351

352

def __init__(self, parameters, on_done_callback, nbio_interface):

353

"""

354

Create connection workflow.

355

356

Parameters:

357

- parameters (ConnectionParameters): Connection configuration

358

- on_done_callback (callable): Called when workflow completes

359

- nbio_interface: Non-blocking I/O interface

360

"""

361

362

def start(self):

363

"""Start the connection workflow."""

364

365

def abort(self):

366

"""Abort the connection workflow."""

367

```

368

369

## Usage Examples

370

371

### Basic Connection

372

373

```python

374

import pika

375

376

# Simple connection

377

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

378

channel = connection.channel()

379

380

# Use the channel...

381

382

connection.close()

383

```

384

385

### Connection with Authentication

386

387

```python

388

import pika

389

390

credentials = pika.PlainCredentials('username', 'password')

391

parameters = pika.ConnectionParameters(

392

host='rabbitmq.example.com',

393

port=5672,

394

virtual_host='/app',

395

credentials=credentials

396

)

397

398

connection = pika.BlockingConnection(parameters)

399

```

400

401

### URL-Based Connection

402

403

```python

404

import pika

405

406

# Connection from URL

407

url = 'amqp://user:pass@localhost:5672/%2F?heartbeat=300'

408

connection = pika.BlockingConnection(pika.URLParameters(url))

409

```

410

411

### Connection with SSL

412

413

```python

414

import pika

415

import ssl

416

417

context = ssl.create_default_context()

418

ssl_options = pika.SSLOptions(context, 'rabbitmq.example.com')

419

420

parameters = pika.ConnectionParameters(

421

host='rabbitmq.example.com',

422

port=5671,

423

ssl_options=ssl_options

424

)

425

426

connection = pika.BlockingConnection(parameters)

427

```

428

429

### Asynchronous Connection

430

431

```python

432

import pika

433

434

def on_open(connection):

435

print('Connection opened')

436

connection.channel(on_open_callback=on_channel_open)

437

438

def on_open_error(connection, error):

439

print(f'Connection failed: {error}')

440

441

def on_close(connection, reason):

442

print(f'Connection closed: {reason}')

443

444

def on_channel_open(channel):

445

print('Channel opened')

446

# Use channel...

447

448

parameters = pika.ConnectionParameters('localhost')

449

connection = pika.SelectConnection(

450

parameters,

451

on_open_callback=on_open,

452

on_open_error_callback=on_open_error,

453

on_close_callback=on_close

454

)

455

456

# Start the IOLoop

457

connection.ioloop.start()

458

```