0
# Connection Pooling
1
2
Thread-safe and non-thread-safe connection pools for managing database connections efficiently in multi-threaded applications, reducing connection overhead and improving performance.
3
4
## Capabilities
5
6
### Pool Exception Handling
7
8
Specialized exception for pool-related errors.
9
10
```python { .api }
11
class PoolError(psycopg2.Error):
12
"""Pool-related errors."""
13
```
14
15
### Abstract Connection Pool
16
17
Base class for all connection pool implementations with core pooling functionality.
18
19
```python { .api }
20
class AbstractConnectionPool:
21
"""Base connection pool class."""
22
23
def __init__(self, minconn, maxconn, *args, **kwargs):
24
"""
25
Initialize pool.
26
27
Parameters:
28
- minconn (int): Minimum connections to maintain
29
- maxconn (int): Maximum connections allowed
30
- *args: Connection arguments passed to psycopg2.connect()
31
- **kwargs: Connection keyword arguments
32
"""
33
34
@property
35
def minconn(self):
36
"""Minimum connections to maintain."""
37
38
@property
39
def maxconn(self):
40
"""Maximum connections allowed."""
41
42
@property
43
def closed(self):
44
"""Pool closed status."""
45
46
def _connect(self, key=None):
47
"""
48
Create new connection (internal).
49
50
Parameters:
51
- key: Connection key for identification
52
53
Returns:
54
connection: New database connection
55
"""
56
57
def _getkey(self):
58
"""Generate unique key (internal)."""
59
60
def _getconn(self, key=None):
61
"""
62
Get connection (internal).
63
64
Parameters:
65
- key: Connection key
66
67
Returns:
68
connection: Database connection
69
"""
70
71
def _putconn(self, conn, key=None, close=False):
72
"""
73
Return connection (internal).
74
75
Parameters:
76
- conn: Connection to return
77
- key: Connection key
78
- close (bool): Force close connection
79
"""
80
81
def _closeall(self):
82
"""Close all connections (internal)."""
83
```
84
85
### Simple Connection Pool
86
87
Non-thread-safe connection pool for single-threaded applications with direct access to pool methods.
88
89
```python { .api }
90
class SimpleConnectionPool(AbstractConnectionPool):
91
"""Non-threadsafe connection pool."""
92
93
def __init__(self, minconn, maxconn, *args, **kwargs):
94
"""
95
Initialize simple pool.
96
97
Parameters:
98
- minconn (int): Minimum connections (1-maxconn)
99
- maxconn (int): Maximum connections
100
- *args: Connection arguments
101
- **kwargs: Connection keyword arguments
102
"""
103
104
def getconn(self, key=None):
105
"""
106
Get connection from pool.
107
108
Parameters:
109
- key: Optional connection key for tracking
110
111
Returns:
112
connection: Database connection
113
114
Raises:
115
PoolError: If no connections available
116
"""
117
118
def putconn(self, conn=None, key=None, close=False):
119
"""
120
Return connection to pool.
121
122
Parameters:
123
- conn: Connection to return (if None, uses key)
124
- key: Connection key
125
- close (bool): Force close connection instead of pooling
126
"""
127
128
def closeall(self):
129
"""Close all connections and reset pool."""
130
```
131
132
**Usage Example:**
133
134
```python
135
import psycopg2
136
from psycopg2.pool import SimpleConnectionPool
137
138
# Create connection pool
139
pool = SimpleConnectionPool(
140
minconn=1,
141
maxconn=5,
142
host="localhost",
143
database="mydb",
144
user="myuser",
145
password="mypass"
146
)
147
148
# Get connection from pool
149
conn = pool.getconn()
150
151
try:
152
with conn.cursor() as cur:
153
cur.execute("SELECT * FROM users")
154
users = cur.fetchall()
155
156
# Connection is still open, can be reused
157
with conn.cursor() as cur:
158
cur.execute("INSERT INTO logs (message) VALUES (%s)", ("User query",))
159
conn.commit()
160
161
finally:
162
# Return connection to pool (don't close it)
163
pool.putconn(conn)
164
165
# Get another connection (might be the same one)
166
conn2 = pool.getconn()
167
with conn2.cursor() as cur:
168
cur.execute("SELECT COUNT(*) FROM users")
169
count = cur.fetchone()[0]
170
print(f"Total users: {count}")
171
172
pool.putconn(conn2)
173
174
# Clean up - close all connections
175
pool.closeall()
176
```
177
178
### Threaded Connection Pool
179
180
Thread-safe connection pool for multi-threaded applications with automatic locking for concurrent access.
181
182
```python { .api }
183
class ThreadedConnectionPool(AbstractConnectionPool):
184
"""Thread-safe connection pool."""
185
186
def __init__(self, minconn, maxconn, *args, **kwargs):
187
"""
188
Initialize threaded pool with locking.
189
190
Parameters:
191
- minconn (int): Minimum connections (1-maxconn)
192
- maxconn (int): Maximum connections
193
- *args: Connection arguments
194
- **kwargs: Connection keyword arguments
195
"""
196
197
def getconn(self, key=None):
198
"""
199
Get connection (thread-safe).
200
201
Parameters:
202
- key: Optional connection key for tracking
203
204
Returns:
205
connection: Database connection
206
207
Raises:
208
PoolError: If no connections available
209
"""
210
211
def putconn(self, conn=None, key=None, close=False):
212
"""
213
Return connection (thread-safe).
214
215
Parameters:
216
- conn: Connection to return (if None, uses key)
217
- key: Connection key
218
- close (bool): Force close connection
219
"""
220
221
def closeall(self):
222
"""Close all connections (thread-safe)."""
223
```
224
225
**Usage Example:**
226
227
```python
228
import psycopg2
229
import threading
230
import time
231
from psycopg2.pool import ThreadedConnectionPool
232
233
# Create thread-safe pool
234
pool = ThreadedConnectionPool(
235
minconn=2,
236
maxconn=10,
237
host="localhost",
238
database="mydb",
239
user="myuser",
240
password="mypass"
241
)
242
243
def worker_function(worker_id):
244
"""Worker function for multi-threading."""
245
try:
246
# Get connection (thread-safe)
247
conn = pool.getconn()
248
print(f"Worker {worker_id} got connection")
249
250
with conn.cursor() as cur:
251
# Simulate work
252
cur.execute("SELECT pg_sleep(1)")
253
cur.execute("SELECT %s as worker_id", (worker_id,))
254
result = cur.fetchone()
255
print(f"Worker {worker_id} completed: {result[0]}")
256
257
except Exception as e:
258
print(f"Worker {worker_id} error: {e}")
259
finally:
260
# Always return connection to pool
261
pool.putconn(conn)
262
print(f"Worker {worker_id} returned connection")
263
264
# Create multiple threads
265
threads = []
266
for i in range(5):
267
thread = threading.Thread(target=worker_function, args=(i,))
268
threads.append(thread)
269
thread.start()
270
271
# Wait for all threads to complete
272
for thread in threads:
273
thread.join()
274
275
print("All workers completed")
276
pool.closeall()
277
```
278
279
### Pool Configuration and Management
280
281
Advanced pool configuration and monitoring patterns.
282
283
**Usage Example:**
284
285
```python
286
import psycopg2
287
from psycopg2.pool import ThreadedConnectionPool, PoolError
288
import contextlib
289
import logging
290
291
# Configure logging
292
logging.basicConfig(level=logging.INFO)
293
logger = logging.getLogger(__name__)
294
295
class ManagedConnectionPool:
296
"""Wrapper for enhanced pool management."""
297
298
def __init__(self, minconn, maxconn, **db_params):
299
self.pool = ThreadedConnectionPool(minconn, maxconn, **db_params)
300
self.logger = logger
301
302
@contextlib.contextmanager
303
def get_connection(self):
304
"""Context manager for automatic connection handling."""
305
conn = None
306
try:
307
conn = self.pool.getconn()
308
self.logger.info("Connection acquired from pool")
309
yield conn
310
except PoolError as e:
311
self.logger.error(f"Pool error: {e}")
312
raise
313
except Exception as e:
314
if conn:
315
conn.rollback()
316
self.logger.error(f"Database error: {e}")
317
raise
318
finally:
319
if conn:
320
self.pool.putconn(conn)
321
self.logger.info("Connection returned to pool")
322
323
def execute_query(self, query, params=None):
324
"""Execute query with automatic connection management."""
325
with self.get_connection() as conn:
326
with conn.cursor() as cur:
327
cur.execute(query, params)
328
if cur.description: # SELECT query
329
return cur.fetchall()
330
else: # INSERT/UPDATE/DELETE
331
conn.commit()
332
return cur.rowcount
333
334
def close(self):
335
"""Close all pool connections."""
336
self.pool.closeall()
337
self.logger.info("Pool closed")
338
339
# Usage
340
managed_pool = ManagedConnectionPool(
341
minconn=2,
342
maxconn=8,
343
host="localhost",
344
database="mydb",
345
user="myuser",
346
password="mypass"
347
)
348
349
try:
350
# Simple query execution
351
users = managed_pool.execute_query("SELECT id, name FROM users LIMIT 5")
352
for user in users:
353
print(f"User: {user[1]}")
354
355
# Insert with parameters
356
rows_affected = managed_pool.execute_query(
357
"INSERT INTO logs (message, level) VALUES (%s, %s)",
358
("Application started", "INFO")
359
)
360
print(f"Inserted {rows_affected} rows")
361
362
# Using context manager directly
363
with managed_pool.get_connection() as conn:
364
with conn.cursor() as cur:
365
cur.execute("BEGIN")
366
cur.execute("INSERT INTO users (name) VALUES (%s)", ("Alice",))
367
cur.execute("INSERT INTO profiles (user_id, bio) VALUES (currval('users_id_seq'), %s)", ("Bio",))
368
conn.commit()
369
370
finally:
371
managed_pool.close()
372
```
373
374
### Pool Monitoring and Health Checks
375
376
Monitor pool health and connection status.
377
378
**Usage Example:**
379
380
```python
381
import psycopg2
382
from psycopg2.pool import ThreadedConnectionPool
383
import threading
384
import time
385
386
class MonitoredPool:
387
"""Connection pool with monitoring capabilities."""
388
389
def __init__(self, minconn, maxconn, **db_params):
390
self.pool = ThreadedConnectionPool(minconn, maxconn, **db_params)
391
self.stats = {
392
'connections_created': 0,
393
'connections_borrowed': 0,
394
'connections_returned': 0,
395
'active_connections': 0
396
}
397
self._lock = threading.Lock()
398
399
def getconn(self, key=None):
400
"""Get connection with stats tracking."""
401
with self._lock:
402
self.stats['connections_borrowed'] += 1
403
self.stats['active_connections'] += 1
404
405
conn = self.pool.getconn(key)
406
407
# Test connection health
408
try:
409
with conn.cursor() as cur:
410
cur.execute("SELECT 1")
411
except psycopg2.Error:
412
# Connection is bad, close it and get a new one
413
self.pool.putconn(conn, close=True)
414
conn = self.pool.getconn(key)
415
with self._lock:
416
self.stats['connections_created'] += 1
417
418
return conn
419
420
def putconn(self, conn, key=None, close=False):
421
"""Return connection with stats tracking."""
422
self.pool.putconn(conn, key, close)
423
with self._lock:
424
self.stats['connections_returned'] += 1
425
self.stats['active_connections'] -= 1
426
427
def get_stats(self):
428
"""Get pool statistics."""
429
with self._lock:
430
return self.stats.copy()
431
432
def health_check(self):
433
"""Perform pool health check."""
434
try:
435
conn = self.pool.getconn()
436
with conn.cursor() as cur:
437
cur.execute("SELECT version()")
438
version = cur.fetchone()[0]
439
self.pool.putconn(conn)
440
return True, f"Pool healthy - {version}"
441
except Exception as e:
442
return False, f"Pool unhealthy - {e}"
443
444
def closeall(self):
445
"""Close all connections."""
446
self.pool.closeall()
447
448
# Usage
449
monitored_pool = MonitoredPool(
450
minconn=2,
451
maxconn=6,
452
host="localhost",
453
database="mydb",
454
user="myuser",
455
password="mypass"
456
)
457
458
# Simulate usage
459
def simulate_work():
460
conn = monitored_pool.getconn()
461
time.sleep(0.1) # Simulate work
462
monitored_pool.putconn(conn)
463
464
# Run multiple workers
465
threads = [threading.Thread(target=simulate_work) for _ in range(10)]
466
for t in threads:
467
t.start()
468
for t in threads:
469
t.join()
470
471
# Check stats
472
stats = monitored_pool.get_stats()
473
print(f"Pool stats: {stats}")
474
475
# Health check
476
healthy, message = monitored_pool.health_check()
477
print(f"Health check: {message}")
478
479
monitored_pool.closeall()
480
```
481
482
### Error Handling and Recovery
483
484
Robust error handling patterns for connection pools.
485
486
**Usage Example:**
487
488
```python
489
import psycopg2
490
from psycopg2.pool import ThreadedConnectionPool, PoolError
491
import time
492
import logging
493
494
logger = logging.getLogger(__name__)
495
496
class ResilientPool:
497
"""Connection pool with error recovery."""
498
499
def __init__(self, minconn, maxconn, **db_params):
500
self.minconn = minconn
501
self.maxconn = maxconn
502
self.db_params = db_params
503
self.pool = None
504
self._create_pool()
505
506
def _create_pool(self):
507
"""Create or recreate pool."""
508
try:
509
if self.pool:
510
self.pool.closeall()
511
self.pool = ThreadedConnectionPool(
512
self.minconn, self.maxconn, **self.db_params
513
)
514
logger.info("Pool created successfully")
515
except Exception as e:
516
logger.error(f"Failed to create pool: {e}")
517
raise
518
519
def get_connection(self, retry_count=3):
520
"""Get connection with retry logic."""
521
for attempt in range(retry_count):
522
try:
523
if not self.pool:
524
self._create_pool()
525
526
conn = self.pool.getconn()
527
528
# Test connection
529
with conn.cursor() as cur:
530
cur.execute("SELECT 1")
531
532
return conn
533
534
except PoolError as e:
535
logger.warning(f"Pool exhausted (attempt {attempt + 1}): {e}")
536
if attempt < retry_count - 1:
537
time.sleep(0.1 * (attempt + 1)) # Exponential backoff
538
else:
539
raise
540
541
except psycopg2.OperationalError as e:
542
logger.error(f"Database connection error (attempt {attempt + 1}): {e}")
543
if attempt < retry_count - 1:
544
# Try to recreate pool
545
self._create_pool()
546
time.sleep(0.5 * (attempt + 1))
547
else:
548
raise
549
550
def return_connection(self, conn, close_on_error=True):
551
"""Return connection with error handling."""
552
try:
553
if conn.closed:
554
logger.warning("Returning closed connection")
555
return
556
557
# Check if connection is in a transaction
558
if conn.status != 1: # STATUS_READY
559
conn.rollback()
560
logger.info("Rolled back transaction before returning connection")
561
562
self.pool.putconn(conn)
563
564
except Exception as e:
565
logger.error(f"Error returning connection: {e}")
566
if close_on_error:
567
try:
568
self.pool.putconn(conn, close=True)
569
except:
570
pass
571
572
def close(self):
573
"""Close pool."""
574
if self.pool:
575
self.pool.closeall()
576
self.pool = None
577
578
# Usage
579
resilient_pool = ResilientPool(
580
minconn=1,
581
maxconn=5,
582
host="localhost",
583
database="mydb",
584
user="myuser",
585
password="mypass"
586
)
587
588
try:
589
conn = resilient_pool.get_connection()
590
591
with conn.cursor() as cur:
592
cur.execute("SELECT * FROM users LIMIT 1")
593
result = cur.fetchone()
594
print(f"Query result: {result}")
595
596
resilient_pool.return_connection(conn)
597
598
except Exception as e:
599
print(f"Error: {e}")
600
601
finally:
602
resilient_pool.close()
603
```
604
605
## Types
606
607
### Pool Classes Hierarchy
608
609
```python { .api }
610
class AbstractConnectionPool:
611
"""Base pool class."""
612
613
minconn: int # Minimum connections
614
maxconn: int # Maximum connections
615
closed: bool # Pool status
616
617
class SimpleConnectionPool(AbstractConnectionPool):
618
"""Non-thread-safe pool."""
619
620
def getconn(self, key=None) -> connection:
621
"""Get connection."""
622
623
def putconn(self, conn=None, key=None, close=False) -> None:
624
"""Return connection."""
625
626
def closeall(self) -> None:
627
"""Close all connections."""
628
629
class ThreadedConnectionPool(AbstractConnectionPool):
630
"""Thread-safe pool."""
631
632
def getconn(self, key=None) -> connection:
633
"""Get connection (thread-safe)."""
634
635
def putconn(self, conn=None, key=None, close=False) -> None:
636
"""Return connection (thread-safe)."""
637
638
def closeall(self) -> None:
639
"""Close all connections (thread-safe)."""
640
```
641
642
### Pool Error Types
643
644
```python { .api }
645
class PoolError(psycopg2.Error):
646
"""Pool-related errors."""
647
```
648
649
### Pool Configuration Parameters
650
651
```python { .api }
652
# Pool initialization parameters
653
minconn: int # Minimum connections (1 <= minconn <= maxconn)
654
maxconn: int # Maximum connections
655
*args: tuple # Positional arguments for psycopg2.connect()
656
**kwargs: dict # Keyword arguments for psycopg2.connect()
657
658
# Common connection parameters
659
host: str # Database host
660
port: int # Database port (default: 5432)
661
database: str # Database name
662
user: str # Username
663
password: str # Password
664
```