0
# Connection Pooling
1
2
Thread-safe and simple connection pools for managing database connections in multi-threaded applications. Connection pooling reduces connection overhead, manages connection limits, and provides automatic connection recycling with configurable pool sizes.
3
4
## Capabilities
5
6
### Abstract Connection Pool
7
8
Base class providing core pooling functionality with connection management, recycling, and cleanup.
9
10
```python { .api }
11
class AbstractConnectionPool:
12
"""Generic key-based pooling code."""
13
14
def __init__(self, minconn, maxconn, *args, **kwargs):
15
"""
16
Initialize connection pool.
17
18
Parameters:
19
- minconn (int): Minimum number of connections to maintain
20
- maxconn (int): Maximum number of connections allowed
21
- *args: Arguments passed to psycopg2.connect()
22
- **kwargs: Keyword arguments passed to psycopg2.connect()
23
"""
24
25
def _connect(self, key=None):
26
"""
27
Create new connection.
28
29
Parameters:
30
- key: Optional connection key
31
32
Returns:
33
connection: New database connection
34
"""
35
36
def _getconn(self, key=None):
37
"""
38
Get connection from pool.
39
40
Parameters:
41
- key: Optional connection key
42
43
Returns:
44
connection: Available connection
45
"""
46
47
def _putconn(self, conn, key=None, close=False):
48
"""
49
Return connection to pool.
50
51
Parameters:
52
- conn (connection): Connection to return
53
- key: Connection key
54
- close (bool): Force close connection
55
"""
56
57
def _closeall(self):
58
"""Close all connections in pool."""
59
60
@property
61
def closed(self):
62
"""Pool closed status."""
63
64
@property
65
def minconn(self):
66
"""Minimum connections maintained."""
67
68
@property
69
def maxconn(self):
70
"""Maximum connections allowed."""
71
```
72
73
### Simple Connection Pool
74
75
Connection pool for single-threaded applications without locking mechanisms.
76
77
```python { .api }
78
class SimpleConnectionPool(AbstractConnectionPool):
79
"""Connection pool for single-threaded applications."""
80
81
def getconn(self, key=None):
82
"""
83
Get connection from pool.
84
85
Parameters:
86
- key: Optional connection identifier
87
88
Returns:
89
connection: Available database connection
90
91
Raises:
92
PoolError: If pool is closed or exhausted
93
"""
94
95
def putconn(self, conn, key=None, close=False):
96
"""
97
Return connection to pool.
98
99
Parameters:
100
- conn (connection): Connection to return
101
- key: Connection key (auto-detected if None)
102
- close (bool): Force close instead of recycling
103
104
Raises:
105
PoolError: If pool is closed or connection invalid
106
"""
107
108
def closeall(self):
109
"""
110
Close all connections in pool.
111
112
Raises:
113
PoolError: If pool already closed
114
"""
115
```
116
117
Usage examples:
118
119
```python
120
from psycopg2.pool import SimpleConnectionPool
121
122
# Create simple pool
123
pool = SimpleConnectionPool(
124
2, # minimum connections
125
10, # maximum connections
126
host="localhost",
127
database="mydb",
128
user="postgres",
129
password="secret"
130
)
131
132
# Get connection
133
conn = pool.getconn()
134
135
try:
136
# Use connection
137
cur = conn.cursor()
138
cur.execute("SELECT * FROM users")
139
users = cur.fetchall()
140
cur.close()
141
finally:
142
# Always return connection
143
pool.putconn(conn)
144
145
# Keyed connections
146
conn1 = pool.getconn("user_queries")
147
conn2 = pool.getconn("admin_queries")
148
149
# Return keyed connections
150
pool.putconn(conn1, "user_queries")
151
pool.putconn(conn2, "admin_queries")
152
153
# Close pool when done
154
pool.closeall()
155
```
156
157
### Threaded Connection Pool
158
159
Thread-safe connection pool with locking for multi-threaded applications.
160
161
```python { .api }
162
class ThreadedConnectionPool(AbstractConnectionPool):
163
"""Thread-safe connection pool."""
164
165
def __init__(self, minconn, maxconn, *args, **kwargs):
166
"""
167
Initialize threaded pool.
168
169
Parameters:
170
- minconn (int): Minimum connections to maintain
171
- maxconn (int): Maximum connections allowed
172
- *args: Arguments for psycopg2.connect()
173
- **kwargs: Keyword arguments for psycopg2.connect()
174
"""
175
176
def getconn(self, key=None):
177
"""
178
Thread-safe get connection from pool.
179
180
Parameters:
181
- key: Optional connection identifier
182
183
Returns:
184
connection: Available database connection
185
186
Raises:
187
PoolError: If pool is closed or exhausted
188
"""
189
190
def putconn(self, conn=None, key=None, close=False):
191
"""
192
Thread-safe return connection to pool.
193
194
Parameters:
195
- conn (connection, optional): Connection to return
196
- key: Connection key (auto-detected if conn is None)
197
- close (bool): Force close instead of recycling
198
199
Raises:
200
PoolError: If pool is closed or connection invalid
201
"""
202
203
def closeall(self):
204
"""
205
Thread-safe close all connections.
206
207
Raises:
208
PoolError: If pool already closed
209
"""
210
```
211
212
Usage examples:
213
214
```python
215
from psycopg2.pool import ThreadedConnectionPool
216
import threading
217
218
# Create thread-safe pool
219
pool = ThreadedConnectionPool(
220
1, # minimum connections
221
20, # maximum connections
222
host="localhost",
223
database="mydb",
224
user="postgres",
225
password="secret"
226
)
227
228
def worker_function(worker_id):
229
"""Worker function for threading example."""
230
conn = None
231
try:
232
# Get connection (thread-safe)
233
conn = pool.getconn()
234
235
# Use connection
236
cur = conn.cursor()
237
cur.execute("SELECT COUNT(*) FROM large_table WHERE worker_id = %s",
238
(worker_id,))
239
count = cur.fetchone()[0]
240
cur.close()
241
242
print(f"Worker {worker_id}: {count} records")
243
244
finally:
245
if conn:
246
# Return connection (thread-safe)
247
pool.putconn(conn)
248
249
# Create multiple threads
250
threads = []
251
for i in range(5):
252
thread = threading.Thread(target=worker_function, args=(i,))
253
threads.append(thread)
254
thread.start()
255
256
# Wait for completion
257
for thread in threads:
258
thread.join()
259
260
# Cleanup
261
pool.closeall()
262
```
263
264
### Pool Context Managers
265
266
Connection pools support context manager protocol for automatic cleanup.
267
268
```python
269
# Pool context manager
270
with ThreadedConnectionPool(1, 10, **conn_params) as pool:
271
conn = pool.getconn()
272
try:
273
# Use connection
274
cur = conn.cursor()
275
cur.execute("SELECT 1")
276
result = cur.fetchone()
277
finally:
278
pool.putconn(conn)
279
# Pool automatically closed
280
281
# Connection context manager with pool
282
def get_pooled_connection():
283
conn = pool.getconn()
284
try:
285
yield conn
286
finally:
287
pool.putconn(conn)
288
289
# Usage
290
with get_pooled_connection() as conn:
291
cur = conn.cursor()
292
cur.execute("SELECT * FROM users")
293
users = cur.fetchall()
294
```
295
296
### Advanced Pool Management
297
298
Connection pools provide advanced features for monitoring and management.
299
300
```python
301
# Pool status monitoring
302
print(f"Pool closed: {pool.closed}")
303
print(f"Min connections: {pool.minconn}")
304
print(f"Max connections: {pool.maxconn}")
305
306
# Force close connections
307
conn = pool.getconn()
308
pool.putconn(conn, close=True) # Force close instead of recycling
309
310
# Connection validation and recycling
311
# Pools automatically handle:
312
# - Connection validation before reuse
313
# - Transaction rollback on return
314
# - Connection replacement for closed connections
315
# - Automatic cleanup of idle connections
316
```
317
318
### Pool Error Handling
319
320
Comprehensive error handling for pool operations.
321
322
```python
323
from psycopg2.pool import PoolError
324
325
try:
326
conn = pool.getconn()
327
# Use connection
328
except PoolError as e:
329
if "exhausted" in str(e):
330
print("No connections available - consider increasing maxconn")
331
elif "closed" in str(e):
332
print("Pool has been closed")
333
else:
334
print(f"Pool error: {e}")
335
336
# Handling connection failures
337
try:
338
pool.putconn(invalid_conn)
339
except PoolError as e:
340
print(f"Cannot return connection: {e}")
341
```
342
343
### Custom Connection Factories with Pools
344
345
Use custom connection classes with connection pools.
346
347
```python
348
from psycopg2.extras import DictConnection
349
350
# Pool with custom connection factory
351
class DictConnectionPool(ThreadedConnectionPool):
352
def _connect(self, key=None):
353
"""Create connection with DictConnection factory."""
354
conn = psycopg2.connect(*self._args,
355
connection_factory=DictConnection,
356
**self._kwargs)
357
if key is not None:
358
self._used[key] = conn
359
self._rused[id(conn)] = key
360
else:
361
self._pool.append(conn)
362
return conn
363
364
# Use custom pool
365
dict_pool = DictConnectionPool(1, 10, **conn_params)
366
conn = dict_pool.getconn() # Returns DictConnection
367
cur = conn.cursor() # Returns DictCursor by default
368
```
369
370
## Types
371
372
### Pool Exceptions
373
374
```python { .api }
375
class PoolError(psycopg2.Error):
376
"""Exception raised by connection pool operations."""
377
pass
378
```
379
380
### Pool Configuration
381
382
```python { .api }
383
PoolConfig = {
384
'minconn': int, # Minimum connections to maintain
385
'maxconn': int, # Maximum connections allowed
386
'host': str, # Database host
387
'port': int, # Database port
388
'database': str, # Database name
389
'user': str, # Username
390
'password': str, # Password
391
# ... other psycopg2.connect() parameters
392
}
393
```
394
395
### Pool State
396
397
```python { .api }
398
PoolState = {
399
'closed': bool, # Pool closed status
400
'minconn': int, # Configured minimum connections
401
'maxconn': int, # Configured maximum connections
402
'_pool': list, # Available connections
403
'_used': dict, # Connections in use (key -> connection)
404
'_rused': dict, # Reverse mapping (connection_id -> key)
405
'_keys': int # Key counter
406
}
407
```
408
409
### Thread Safety
410
411
```python
412
# ThreadedConnectionPool uses threading.Lock for:
413
# - getconn() operations
414
# - putconn() operations
415
# - closeall() operations
416
# - Internal pool state management
417
418
# SimpleConnectionPool provides no locking:
419
# - Suitable for single-threaded applications
420
# - Higher performance due to no locking overhead
421
# - Must not be shared between threads
422
```