or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

compression.mdconnection.mdentities.mdexceptions.mdindex.mdmessaging.mdmixins.mdpools.mdserialization.mdsimple.md

connection.mddocs/

0

# Connection Management

1

2

Robust connection handling with pooling, retry logic, and failover support for connecting to message brokers across multiple transport backends. Kombu's connection management provides automatic reconnection, resource pooling, and comprehensive error handling for reliable messaging operations.

3

4

## Capabilities

5

6

### Connection Class

7

8

Primary connection class for establishing and managing broker connections with support for automatic reconnection, channel management, and transport abstraction.

9

10

```python { .api }

11

class Connection:

12

def __init__(self, hostname='localhost', userid=None, password=None, virtual_host=None, port=None, insist=False, ssl=False, transport=None, connect_timeout=5, transport_options=None, login_method=None, uri_prefix=None, heartbeat=0, failover_strategy='round-robin', alternates=None, **kwargs):

13

"""

14

Create connection to message broker.

15

16

Parameters:

17

- hostname (str): Broker hostname (default 'localhost')

18

- userid (str): Username for authentication

19

- password (str): Password for authentication

20

- virtual_host (str): Virtual host (AMQP concept)

21

- port (int): Broker port number

22

- insist (bool): Insist on connection (deprecated)

23

- ssl (bool|dict): SSL configuration

24

- transport (str): Transport backend name

25

- connect_timeout (float): Connection timeout in seconds

26

- transport_options (dict): Transport-specific options

27

- login_method (str): SASL login method

28

- uri_prefix (str): URI prefix for transport

29

- heartbeat (int): Heartbeat interval in seconds (0=disabled)

30

- failover_strategy (str): Strategy for multiple hosts

31

- alternates (list): Alternative broker URLs

32

- **kwargs: Additional connection parameters

33

"""

34

35

def connect(self):

36

"""

37

Establish connection to the broker immediately.

38

39

Returns:

40

Connection instance for chaining

41

"""

42

43

def channel(self):

44

"""

45

Create and return a new channel.

46

47

Returns:

48

Channel instance

49

"""

50

51

def drain_events(self, timeout=None):

52

"""

53

Wait for a single event from the server.

54

55

Parameters:

56

- timeout (float): Timeout in seconds

57

58

Returns:

59

Event data or raises socket.timeout

60

"""

61

62

def ensure_connection(self, errback=None, max_retries=None, interval_start=2, interval_step=2, interval_max=30):

63

"""

64

Ensure connection is established with retry logic.

65

66

Parameters:

67

- errback (callable): Error callback function

68

- max_retries (int): Maximum retry attempts

69

- interval_start (float): Initial retry interval

70

- interval_step (float): Interval increase per retry

71

- interval_max (float): Maximum retry interval

72

73

Returns:

74

Context manager for ensured connection

75

"""

76

77

def ensure(self, obj, fun, errback=None, max_retries=None, **retry_policy):

78

"""

79

Ensure operation completes despite connection errors.

80

81

Parameters:

82

- obj: Object to call method on

83

- fun (str): Method name to call

84

- errback (callable): Error callback

85

- max_retries (int): Maximum retries

86

- retry_policy: Additional retry parameters

87

88

Returns:

89

Result of the operation

90

"""

91

92

def heartbeat_check(self, rate=2):

93

"""

94

Check heartbeats at specified rate.

95

96

Parameters:

97

- rate (int): Check frequency in seconds

98

"""

99

100

def close(self):

101

"""Close the connection and cleanup resources."""

102

103

def release(self):

104

"""Release connection back to pool."""

105

106

# Connection factory methods

107

def Pool(self, limit=None, preload=None):

108

"""

109

Create connection pool.

110

111

Parameters:

112

- limit (int): Maximum pool size

113

- preload (int): Number of connections to preload

114

115

Returns:

116

ConnectionPool instance

117

"""

118

119

def ChannelPool(self, limit=None, preload=None):

120

"""

121

Create channel pool.

122

123

Parameters:

124

- limit (int): Maximum pool size

125

- preload (int): Number of channels to preload

126

127

Returns:

128

ChannelPool instance

129

"""

130

131

def Producer(self, channel=None, *args, **kwargs):

132

"""

133

Create Producer instance.

134

135

Parameters:

136

- channel: Channel to use (uses default_channel if None)

137

138

Returns:

139

Producer instance

140

"""

141

142

def Consumer(self, queues, channel=None, *args, **kwargs):

143

"""

144

Create Consumer instance.

145

146

Parameters:

147

- queues: Queues to consume from

148

- channel: Channel to use (uses default_channel if None)

149

150

Returns:

151

Consumer instance

152

"""

153

154

def SimpleQueue(self, name, no_ack=None, queue_opts=None, exchange_opts=None, channel=None, **kwargs):

155

"""

156

Create SimpleQueue instance.

157

158

Parameters:

159

- name (str): Queue name

160

- no_ack (bool): Disable acknowledgments

161

- queue_opts (dict): Queue options

162

- exchange_opts (dict): Exchange options

163

- channel: Channel to use

164

165

Returns:

166

SimpleQueue instance

167

"""

168

169

def SimpleBuffer(self, name, no_ack=True, queue_opts=None, exchange_opts=None, channel=None, **kwargs):

170

"""

171

Create SimpleBuffer instance (ephemeral queue).

172

173

Parameters:

174

- name (str): Queue name

175

- no_ack (bool): Disable acknowledgments (default True)

176

- queue_opts (dict): Queue options

177

- exchange_opts (dict): Exchange options

178

- channel: Channel to use

179

180

Returns:

181

SimpleBuffer instance

182

"""

183

184

# Properties

185

@property

186

def connected(self):

187

"""bool: True if connection is established"""

188

189

@property

190

def connection(self):

191

"""Transport-specific connection object"""

192

193

@property

194

def default_channel(self):

195

"""Default channel (created on first access)"""

196

197

@property

198

def transport(self):

199

"""Transport instance"""

200

201

@property

202

def recoverable_connection_errors(self):

203

"""Tuple of recoverable connection error types"""

204

205

@property

206

def connection_errors(self):

207

"""Tuple of connection error types"""

208

```

209

210

### BrokerConnection Class

211

212

Legacy alias for Connection class provided for backward compatibility.

213

214

```python { .api }

215

BrokerConnection = Connection

216

```

217

218

**Note:** `BrokerConnection` is an alias for `Connection` and provides identical functionality. New code should use `Connection` directly.

219

220

### Connection Pooling

221

222

Global connection and producer pools for efficient resource management across applications.

223

224

```python { .api }

225

# Pool management functions

226

def get_limit():

227

"""

228

Get current connection pool limit.

229

230

Returns:

231

int: Current pool limit

232

"""

233

234

def set_limit(limit, force=False, reset_after=False, ignore_errors=False):

235

"""

236

Set new connection pool limit.

237

238

Parameters:

239

- limit (int): New pool limit

240

- force (bool): Force limit change

241

- reset_after (bool): Reset pools after change

242

- ignore_errors (bool): Ignore errors during reset

243

"""

244

245

def reset(*args, **kwargs):

246

"""

247

Reset all pools by closing resources.

248

249

Parameters:

250

- *args, **kwargs: Arguments passed to pool reset

251

"""

252

253

# Pool classes

254

class ProducerPool:

255

"""Pool of Producer instances"""

256

257

def acquire(self, block=False, timeout=None):

258

"""

259

Acquire producer from pool.

260

261

Parameters:

262

- block (bool): Block if pool empty

263

- timeout (float): Acquisition timeout

264

265

Returns:

266

Producer instance

267

"""

268

269

def release(self, resource):

270

"""

271

Release producer back to pool.

272

273

Parameters:

274

- resource: Producer to release

275

"""

276

277

class ConnectionPool:

278

"""Pool of Connection instances"""

279

280

def acquire(self, block=False, timeout=None):

281

"""

282

Acquire connection from pool.

283

284

Parameters:

285

- block (bool): Block if pool empty

286

- timeout (float): Acquisition timeout

287

288

Returns:

289

Connection instance

290

"""

291

292

def release(self, resource):

293

"""

294

Release connection back to pool.

295

296

Parameters:

297

- resource: Connection to release

298

"""

299

300

class ChannelPool:

301

"""Pool of Channel instances bound to connection"""

302

303

def acquire(self, block=False, timeout=None):

304

"""

305

Acquire channel from pool.

306

307

Parameters:

308

- block (bool): Block if pool empty

309

- timeout (float): Acquisition timeout

310

311

Returns:

312

Channel instance

313

"""

314

315

def release(self, resource):

316

"""

317

Release channel back to pool.

318

319

Parameters:

320

- resource: Channel to release

321

"""

322

323

# Global pool instances

324

connections: Connections # Global connection pool group

325

producers: Producers # Global producer pool group

326

```

327

328

### URL Parsing

329

330

Utility for parsing broker URLs into connection parameters.

331

332

```python { .api }

333

def parse_url(url):

334

"""

335

Parse URL into mapping of connection components.

336

337

Parameters:

338

- url (str): Broker URL to parse

339

340

Returns:

341

dict: Parsed URL components with keys:

342

- transport (str): Transport name

343

- hostname (str): Broker hostname

344

- port (int): Port number

345

- userid (str): Username

346

- password (str): Password

347

- virtual_host (str): Virtual host

348

"""

349

```

350

351

## Usage Examples

352

353

### Basic Connection

354

355

```python

356

from kombu import Connection

357

358

# Connect with URL

359

conn = Connection('redis://localhost:6379/0')

360

361

# Connect with parameters

362

conn = Connection(

363

hostname='localhost',

364

userid='guest',

365

password='guest',

366

virtual_host='/',

367

transport='pyamqp'

368

)

369

370

with conn:

371

# Use connection

372

channel = conn.channel()

373

# ... perform operations

374

```

375

376

### Connection with Retry Logic

377

378

```python

379

from kombu import Connection

380

381

conn = Connection('redis://localhost:6379/0')

382

383

def on_connection_error(exc, interval):

384

print(f"Connection error: {exc}, retrying in {interval}s")

385

386

# Ensure connection with custom retry policy

387

with conn.ensure_connection(

388

errback=on_connection_error,

389

max_retries=5,

390

interval_start=1,

391

interval_step=2,

392

interval_max=10

393

):

394

# Connection guaranteed to be established

395

channel = conn.channel()

396

```

397

398

### Connection Pooling

399

400

```python

401

from kombu import pools

402

403

# Use global connection pool

404

with pools.connections['redis://localhost:6379/0'].acquire() as conn:

405

producer = conn.Producer()

406

producer.publish({'msg': 'hello'}, routing_key='test')

407

408

# Use global producer pool

409

with pools.producers['redis://localhost:6379/0'].acquire() as producer:

410

producer.publish({'msg': 'hello'}, routing_key='test')

411

412

# Set pool limits

413

pools.set_limit(100) # Allow up to 100 connections per pool

414

```

415

416

### URL Parsing

417

418

```python

419

from kombu.utils.url import parse_url

420

421

# Parse Redis URL

422

parsed = parse_url('redis://user:pass@localhost:6379/1')

423

print(parsed)

424

# {'transport': 'redis', 'hostname': 'localhost', 'port': 6379,

425

# 'userid': 'user', 'password': 'pass', 'virtual_host': '1'}

426

427

# Parse AMQP URL

428

parsed = parse_url('amqp://guest:guest@localhost:5672//')

429

print(parsed)

430

# {'transport': 'pyamqp', 'hostname': 'localhost', 'port': 5672,

431

# 'userid': 'guest', 'password': 'guest', 'virtual_host': '/'}

432

```

433

434

### Event Processing

435

436

```python

437

from kombu import Connection, Queue, Consumer

438

from kombu.common import eventloop

439

440

def process_message(body, message):

441

print(f"Processing: {body}")

442

message.ack()

443

444

conn = Connection('redis://localhost:6379/0')

445

queue = Queue('test_queue')

446

447

with conn:

448

consumer = conn.Consumer(queue, callbacks=[process_message])

449

consumer.consume()

450

451

# Process events with timeout

452

for _ in eventloop(conn, limit=10, timeout=5.0):

453

pass # Events processed via callbacks

454

```