0
# Connection Management
1
2
Robust connection handling with pooling, retry logic, and failover support for connecting to message brokers across multiple transport backends. Kombu's connection management provides automatic reconnection, resource pooling, and comprehensive error handling for reliable messaging operations.
3
4
## Capabilities
5
6
### Connection Class
7
8
Primary connection class for establishing and managing broker connections with support for automatic reconnection, channel management, and transport abstraction.
9
10
```python { .api }
11
class Connection:
12
def __init__(self, hostname='localhost', userid=None, password=None, virtual_host=None, port=None, insist=False, ssl=False, transport=None, connect_timeout=5, transport_options=None, login_method=None, uri_prefix=None, heartbeat=0, failover_strategy='round-robin', alternates=None, **kwargs):
13
"""
14
Create connection to message broker.
15
16
Parameters:
17
- hostname (str): Broker hostname (default 'localhost')
18
- userid (str): Username for authentication
19
- password (str): Password for authentication
20
- virtual_host (str): Virtual host (AMQP concept)
21
- port (int): Broker port number
22
- insist (bool): Insist on connection (deprecated)
23
- ssl (bool|dict): SSL configuration
24
- transport (str): Transport backend name
25
- connect_timeout (float): Connection timeout in seconds
26
- transport_options (dict): Transport-specific options
27
- login_method (str): SASL login method
28
- uri_prefix (str): URI prefix for transport
29
- heartbeat (int): Heartbeat interval in seconds (0=disabled)
30
- failover_strategy (str): Strategy for multiple hosts
31
- alternates (list): Alternative broker URLs
32
- **kwargs: Additional connection parameters
33
"""
34
35
def connect(self):
36
"""
37
Establish connection to the broker immediately.
38
39
Returns:
40
Connection instance for chaining
41
"""
42
43
def channel(self):
44
"""
45
Create and return a new channel.
46
47
Returns:
48
Channel instance
49
"""
50
51
def drain_events(self, timeout=None):
52
"""
53
Wait for a single event from the server.
54
55
Parameters:
56
- timeout (float): Timeout in seconds
57
58
Returns:
59
Event data or raises socket.timeout
60
"""
61
62
def ensure_connection(self, errback=None, max_retries=None, interval_start=2, interval_step=2, interval_max=30):
63
"""
64
Ensure connection is established with retry logic.
65
66
Parameters:
67
- errback (callable): Error callback function
68
- max_retries (int): Maximum retry attempts
69
- interval_start (float): Initial retry interval
70
- interval_step (float): Interval increase per retry
71
- interval_max (float): Maximum retry interval
72
73
Returns:
74
Context manager for ensured connection
75
"""
76
77
def ensure(self, obj, fun, errback=None, max_retries=None, **retry_policy):
78
"""
79
Ensure operation completes despite connection errors.
80
81
Parameters:
82
- obj: Object to call method on
83
- fun (str): Method name to call
84
- errback (callable): Error callback
85
- max_retries (int): Maximum retries
86
- retry_policy: Additional retry parameters
87
88
Returns:
89
Result of the operation
90
"""
91
92
def heartbeat_check(self, rate=2):
93
"""
94
Check heartbeats at specified rate.
95
96
Parameters:
97
- rate (int): Check frequency in seconds
98
"""
99
100
def close(self):
101
"""Close the connection and cleanup resources."""
102
103
def release(self):
104
"""Release connection back to pool."""
105
106
# Connection factory methods
107
def Pool(self, limit=None, preload=None):
108
"""
109
Create connection pool.
110
111
Parameters:
112
- limit (int): Maximum pool size
113
- preload (int): Number of connections to preload
114
115
Returns:
116
ConnectionPool instance
117
"""
118
119
def ChannelPool(self, limit=None, preload=None):
120
"""
121
Create channel pool.
122
123
Parameters:
124
- limit (int): Maximum pool size
125
- preload (int): Number of channels to preload
126
127
Returns:
128
ChannelPool instance
129
"""
130
131
def Producer(self, channel=None, *args, **kwargs):
132
"""
133
Create Producer instance.
134
135
Parameters:
136
- channel: Channel to use (uses default_channel if None)
137
138
Returns:
139
Producer instance
140
"""
141
142
def Consumer(self, queues, channel=None, *args, **kwargs):
143
"""
144
Create Consumer instance.
145
146
Parameters:
147
- queues: Queues to consume from
148
- channel: Channel to use (uses default_channel if None)
149
150
Returns:
151
Consumer instance
152
"""
153
154
def SimpleQueue(self, name, no_ack=None, queue_opts=None, exchange_opts=None, channel=None, **kwargs):
155
"""
156
Create SimpleQueue instance.
157
158
Parameters:
159
- name (str): Queue name
160
- no_ack (bool): Disable acknowledgments
161
- queue_opts (dict): Queue options
162
- exchange_opts (dict): Exchange options
163
- channel: Channel to use
164
165
Returns:
166
SimpleQueue instance
167
"""
168
169
def SimpleBuffer(self, name, no_ack=True, queue_opts=None, exchange_opts=None, channel=None, **kwargs):
170
"""
171
Create SimpleBuffer instance (ephemeral queue).
172
173
Parameters:
174
- name (str): Queue name
175
- no_ack (bool): Disable acknowledgments (default True)
176
- queue_opts (dict): Queue options
177
- exchange_opts (dict): Exchange options
178
- channel: Channel to use
179
180
Returns:
181
SimpleBuffer instance
182
"""
183
184
# Properties
185
@property
186
def connected(self):
187
"""bool: True if connection is established"""
188
189
@property
190
def connection(self):
191
"""Transport-specific connection object"""
192
193
@property
194
def default_channel(self):
195
"""Default channel (created on first access)"""
196
197
@property
198
def transport(self):
199
"""Transport instance"""
200
201
@property
202
def recoverable_connection_errors(self):
203
"""Tuple of recoverable connection error types"""
204
205
@property
206
def connection_errors(self):
207
"""Tuple of connection error types"""
208
```
209
210
### BrokerConnection Class
211
212
Legacy alias for Connection class provided for backward compatibility.
213
214
```python { .api }
215
BrokerConnection = Connection
216
```
217
218
**Note:** `BrokerConnection` is an alias for `Connection` and provides identical functionality. New code should use `Connection` directly.
219
220
### Connection Pooling
221
222
Global connection and producer pools for efficient resource management across applications.
223
224
```python { .api }
225
# Pool management functions
226
def get_limit():
227
"""
228
Get current connection pool limit.
229
230
Returns:
231
int: Current pool limit
232
"""
233
234
def set_limit(limit, force=False, reset_after=False, ignore_errors=False):
235
"""
236
Set new connection pool limit.
237
238
Parameters:
239
- limit (int): New pool limit
240
- force (bool): Force limit change
241
- reset_after (bool): Reset pools after change
242
- ignore_errors (bool): Ignore errors during reset
243
"""
244
245
def reset(*args, **kwargs):
246
"""
247
Reset all pools by closing resources.
248
249
Parameters:
250
- *args, **kwargs: Arguments passed to pool reset
251
"""
252
253
# Pool classes
254
class ProducerPool:
255
"""Pool of Producer instances"""
256
257
def acquire(self, block=False, timeout=None):
258
"""
259
Acquire producer from pool.
260
261
Parameters:
262
- block (bool): Block if pool empty
263
- timeout (float): Acquisition timeout
264
265
Returns:
266
Producer instance
267
"""
268
269
def release(self, resource):
270
"""
271
Release producer back to pool.
272
273
Parameters:
274
- resource: Producer to release
275
"""
276
277
class ConnectionPool:
278
"""Pool of Connection instances"""
279
280
def acquire(self, block=False, timeout=None):
281
"""
282
Acquire connection from pool.
283
284
Parameters:
285
- block (bool): Block if pool empty
286
- timeout (float): Acquisition timeout
287
288
Returns:
289
Connection instance
290
"""
291
292
def release(self, resource):
293
"""
294
Release connection back to pool.
295
296
Parameters:
297
- resource: Connection to release
298
"""
299
300
class ChannelPool:
301
"""Pool of Channel instances bound to connection"""
302
303
def acquire(self, block=False, timeout=None):
304
"""
305
Acquire channel from pool.
306
307
Parameters:
308
- block (bool): Block if pool empty
309
- timeout (float): Acquisition timeout
310
311
Returns:
312
Channel instance
313
"""
314
315
def release(self, resource):
316
"""
317
Release channel back to pool.
318
319
Parameters:
320
- resource: Channel to release
321
"""
322
323
# Global pool instances
324
connections: Connections # Global connection pool group
325
producers: Producers # Global producer pool group
326
```
327
328
### URL Parsing
329
330
Utility for parsing broker URLs into connection parameters.
331
332
```python { .api }
333
def parse_url(url):
334
"""
335
Parse URL into mapping of connection components.
336
337
Parameters:
338
- url (str): Broker URL to parse
339
340
Returns:
341
dict: Parsed URL components with keys:
342
- transport (str): Transport name
343
- hostname (str): Broker hostname
344
- port (int): Port number
345
- userid (str): Username
346
- password (str): Password
347
- virtual_host (str): Virtual host
348
"""
349
```
350
351
## Usage Examples
352
353
### Basic Connection
354
355
```python
356
from kombu import Connection
357
358
# Connect with URL
359
conn = Connection('redis://localhost:6379/0')
360
361
# Connect with parameters
362
conn = Connection(
363
hostname='localhost',
364
userid='guest',
365
password='guest',
366
virtual_host='/',
367
transport='pyamqp'
368
)
369
370
with conn:
371
# Use connection
372
channel = conn.channel()
373
# ... perform operations
374
```
375
376
### Connection with Retry Logic
377
378
```python
379
from kombu import Connection
380
381
conn = Connection('redis://localhost:6379/0')
382
383
def on_connection_error(exc, interval):
384
print(f"Connection error: {exc}, retrying in {interval}s")
385
386
# Ensure connection with custom retry policy
387
with conn.ensure_connection(
388
errback=on_connection_error,
389
max_retries=5,
390
interval_start=1,
391
interval_step=2,
392
interval_max=10
393
):
394
# Connection guaranteed to be established
395
channel = conn.channel()
396
```
397
398
### Connection Pooling
399
400
```python
401
from kombu import pools
402
403
# Use global connection pool
404
with pools.connections['redis://localhost:6379/0'].acquire() as conn:
405
producer = conn.Producer()
406
producer.publish({'msg': 'hello'}, routing_key='test')
407
408
# Use global producer pool
409
with pools.producers['redis://localhost:6379/0'].acquire() as producer:
410
producer.publish({'msg': 'hello'}, routing_key='test')
411
412
# Set pool limits
413
pools.set_limit(100) # Allow up to 100 connections per pool
414
```
415
416
### URL Parsing
417
418
```python
419
from kombu.utils.url import parse_url
420
421
# Parse Redis URL
422
parsed = parse_url('redis://user:pass@localhost:6379/1')
423
print(parsed)
424
# {'transport': 'redis', 'hostname': 'localhost', 'port': 6379,
425
# 'userid': 'user', 'password': 'pass', 'virtual_host': '1'}
426
427
# Parse AMQP URL
428
parsed = parse_url('amqp://guest:guest@localhost:5672//')
429
print(parsed)
430
# {'transport': 'pyamqp', 'hostname': 'localhost', 'port': 5672,
431
# 'userid': 'guest', 'password': 'guest', 'virtual_host': '/'}
432
```
433
434
### Event Processing
435
436
```python
437
from kombu import Connection, Queue, Consumer
438
from kombu.common import eventloop
439
440
def process_message(body, message):
441
print(f"Processing: {body}")
442
message.ack()
443
444
conn = Connection('redis://localhost:6379/0')
445
queue = Queue('test_queue')
446
447
with conn:
448
consumer = conn.Consumer(queue, callbacks=[process_message])
449
consumer.consume()
450
451
# Process events with timeout
452
for _ in eventloop(conn, limit=10, timeout=5.0):
453
pass # Events processed via callbacks
454
```