0
# Connection and Producer Pooling
1
2
Resource pooling for connections and producers to optimize performance and manage resources efficiently in high-throughput applications. Connection pools prevent the overhead of establishing connections for each operation, while producer pools maintain ready-to-use producer instances.
3
4
## Core Imports
5
6
```python
7
from kombu import pools
8
from kombu.pools import connections, producers, ProducerPool, PoolGroup
9
from kombu.pools import get_limit, set_limit, reset
10
```
11
12
## Capabilities
13
14
### Global Pool Instances
15
16
Pre-configured global pools for connections and producers that can be used throughout an application.
17
18
```python { .api }
19
connections: Connections # Global connection pool group
20
producers: Producers # Global producer pool group
21
```
22
23
**Usage Example:**
24
25
```python
26
from kombu import pools
27
28
# Use global connection pool
29
with pools.connections['redis://localhost:6379/0'].acquire() as conn:
30
# Use connection
31
with conn.channel() as channel:
32
# Operate on channel
33
pass
34
35
# Use global producer pool
36
with pools.producers['redis://localhost:6379/0'].acquire() as producer:
37
producer.publish({'message': 'data'}, routing_key='task')
38
```
39
40
### ProducerPool Class
41
42
Pool of Producer instances that share connections from a connection pool.
43
44
```python { .api }
45
class ProducerPool:
46
"""Pool of kombu.Producer instances."""
47
48
def __init__(self, connections, *args, **kwargs):
49
"""
50
Initialize producer pool.
51
52
Parameters:
53
- connections: Connection pool to use for producers
54
- Producer: Producer class to use (default: kombu.Producer)
55
- *args, **kwargs: Passed to Resource base class
56
"""
57
58
def acquire(self, block=False, timeout=None):
59
"""
60
Acquire producer from pool.
61
62
Parameters:
63
- block (bool): Whether to block if no resource available
64
- timeout (float): Timeout for blocking acquire
65
66
Returns:
67
Producer instance from pool
68
"""
69
70
def release(self, resource):
71
"""
72
Release producer back to pool.
73
74
Parameters:
75
- resource: Producer instance to release
76
"""
77
78
def create_producer(self):
79
"""Create new producer instance."""
80
81
def prepare(self, producer):
82
"""Prepare producer for use."""
83
84
def close_resource(self, resource):
85
"""Close producer resource."""
86
```
87
88
### PoolGroup Class
89
90
Base class for managing collections of resource pools, automatically creating pools for different resources as needed.
91
92
```python { .api }
93
class PoolGroup:
94
"""Collection of resource pools."""
95
96
def __init__(self, limit=None, close_after_fork=True):
97
"""
98
Initialize pool group.
99
100
Parameters:
101
- limit (int): Default limit for created pools
102
- close_after_fork (bool): Whether to close pools after fork
103
"""
104
105
def create(self, resource, limit):
106
"""
107
Create pool for resource (must be implemented by subclasses).
108
109
Parameters:
110
- resource: Resource to create pool for
111
- limit (int): Pool size limit
112
113
Returns:
114
Pool instance for resource
115
"""
116
117
def __getitem__(self, resource):
118
"""Get or create pool for resource."""
119
120
def __missing__(self, resource):
121
"""Create new pool when resource not found."""
122
```
123
124
### Concrete Pool Group Classes
125
126
Specific implementations of PoolGroup for connections and producers.
127
128
```python { .api }
129
class Connections(PoolGroup):
130
"""Collection of connection pools."""
131
132
def create(self, connection, limit):
133
"""Create connection pool for given connection."""
134
135
class Producers(PoolGroup):
136
"""Collection of producer pools."""
137
138
def create(self, connection, limit):
139
"""Create producer pool for given connection."""
140
```
141
142
### Pool Management Functions
143
144
Global functions for managing pool limits and state.
145
146
```python { .api }
147
def get_limit() -> int:
148
"""Get current global connection pool limit."""
149
150
def set_limit(limit: int, force=False, reset_after=False, ignore_errors=False) -> int:
151
"""
152
Set new global connection pool limit.
153
154
Parameters:
155
- limit (int): New pool size limit
156
- force (bool): Force limit change even if same as current
157
- reset_after (bool): Reset pools after setting limit
158
- ignore_errors (bool): Ignore errors during pool resizing
159
160
Returns:
161
The set limit value
162
"""
163
164
def reset(*args, **kwargs):
165
"""Reset all pools by closing open resources and clearing groups."""
166
167
def register_group(group) -> PoolGroup:
168
"""Register pool group for management (can be used as decorator)."""
169
```
170
171
## Advanced Usage
172
173
### Custom Producer Pool
174
175
```python
176
from kombu.pools import ProducerPool
177
from kombu import Connection
178
179
# Create custom producer pool
180
conn_pool = Connection('redis://localhost:6379/0').Pool(limit=10)
181
producer_pool = ProducerPool(conn_pool, limit=5)
182
183
# Use custom pool
184
with producer_pool.acquire() as producer:
185
producer.publish({'custom': 'message'}, routing_key='custom')
186
```
187
188
### Pool Limit Management
189
190
```python
191
from kombu.pools import get_limit, set_limit, reset
192
193
# Check current limit
194
current = get_limit()
195
print(f"Current pool limit: {current}")
196
197
# Set new limit
198
set_limit(20)
199
200
# Reset all pools (useful for testing or cleanup)
201
reset()
202
```
203
204
### Context Manager Usage
205
206
```python
207
from kombu.pools import connections, producers
208
209
# Connection pool context manager
210
with connections['redis://localhost:6379/0'].acquire() as conn:
211
# Connection is automatically released when exiting context
212
with conn.channel() as channel:
213
# Use channel
214
pass
215
216
# Producer pool context manager
217
with producers['redis://localhost:6379/0'].acquire() as producer:
218
# Producer is automatically released when exiting context
219
producer.publish({'msg': 'data'}, routing_key='task')
220
```
221
222
## Error Handling
223
224
Pool operations can raise various exceptions:
225
226
```python
227
from kombu.pools import producers
228
from kombu.exceptions import LimitExceeded, OperationalError
229
230
try:
231
with producers['redis://localhost:6379/0'].acquire(timeout=5.0) as producer:
232
producer.publish({'data': 'message'}, routing_key='task')
233
except LimitExceeded:
234
print("Pool limit exceeded, no resources available")
235
except OperationalError as e:
236
print(f"Connection error: {e}")
237
```