Python-PostgreSQL Database Adapter
—
Thread-safe and non-thread-safe connection pools for managing database connections efficiently in multi-threaded applications, reducing connection overhead and improving performance.
Specialized exception for pool-related errors.
class PoolError(psycopg2.Error):
"""Pool-related errors."""Base class for all connection pool implementations with core pooling functionality.
class AbstractConnectionPool:
"""Base connection pool class."""
def __init__(self, minconn, maxconn, *args, **kwargs):
"""
Initialize pool.
Parameters:
- minconn (int): Minimum connections to maintain
- maxconn (int): Maximum connections allowed
- *args: Connection arguments passed to psycopg2.connect()
- **kwargs: Connection keyword arguments
"""
@property
def minconn(self):
"""Minimum connections to maintain."""
@property
def maxconn(self):
"""Maximum connections allowed."""
@property
def closed(self):
"""Pool closed status."""
def _connect(self, key=None):
"""
Create new connection (internal).
Parameters:
- key: Connection key for identification
Returns:
connection: New database connection
"""
def _getkey(self):
"""Generate unique key (internal)."""
def _getconn(self, key=None):
"""
Get connection (internal).
Parameters:
- key: Connection key
Returns:
connection: Database connection
"""
def _putconn(self, conn, key=None, close=False):
"""
Return connection (internal).
Parameters:
- conn: Connection to return
- key: Connection key
- close (bool): Force close connection
"""
def _closeall(self):
"""Close all connections (internal)."""Non-thread-safe connection pool for single-threaded applications with direct access to pool methods.
class SimpleConnectionPool(AbstractConnectionPool):
"""Non-threadsafe connection pool."""
def __init__(self, minconn, maxconn, *args, **kwargs):
"""
Initialize simple pool.
Parameters:
- minconn (int): Minimum connections (1-maxconn)
- maxconn (int): Maximum connections
- *args: Connection arguments
- **kwargs: Connection keyword arguments
"""
def getconn(self, key=None):
"""
Get connection from pool.
Parameters:
- key: Optional connection key for tracking
Returns:
connection: Database connection
Raises:
PoolError: If no connections available
"""
def putconn(self, conn=None, key=None, close=False):
"""
Return connection to pool.
Parameters:
- conn: Connection to return (if None, uses key)
- key: Connection key
- close (bool): Force close connection instead of pooling
"""
def closeall(self):
"""Close all connections and reset pool."""Usage Example:
import psycopg2
from psycopg2.pool import SimpleConnectionPool
# Create connection pool
pool = SimpleConnectionPool(
minconn=1,
maxconn=5,
host="localhost",
database="mydb",
user="myuser",
password="mypass"
)
# Get connection from pool
conn = pool.getconn()
try:
with conn.cursor() as cur:
cur.execute("SELECT * FROM users")
users = cur.fetchall()
# Connection is still open, can be reused
with conn.cursor() as cur:
cur.execute("INSERT INTO logs (message) VALUES (%s)", ("User query",))
conn.commit()
finally:
# Return connection to pool (don't close it)
pool.putconn(conn)
# Get another connection (might be the same one)
conn2 = pool.getconn()
with conn2.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM users")
count = cur.fetchone()[0]
print(f"Total users: {count}")
pool.putconn(conn2)
# Clean up - close all connections
pool.closeall()Thread-safe connection pool for multi-threaded applications with automatic locking for concurrent access.
class ThreadedConnectionPool(AbstractConnectionPool):
"""Thread-safe connection pool."""
def __init__(self, minconn, maxconn, *args, **kwargs):
"""
Initialize threaded pool with locking.
Parameters:
- minconn (int): Minimum connections (1-maxconn)
- maxconn (int): Maximum connections
- *args: Connection arguments
- **kwargs: Connection keyword arguments
"""
def getconn(self, key=None):
"""
Get connection (thread-safe).
Parameters:
- key: Optional connection key for tracking
Returns:
connection: Database connection
Raises:
PoolError: If no connections available
"""
def putconn(self, conn=None, key=None, close=False):
"""
Return connection (thread-safe).
Parameters:
- conn: Connection to return (if None, uses key)
- key: Connection key
- close (bool): Force close connection
"""
def closeall(self):
"""Close all connections (thread-safe)."""Usage Example:
import psycopg2
import threading
import time
from psycopg2.pool import ThreadedConnectionPool
# Create thread-safe pool
pool = ThreadedConnectionPool(
minconn=2,
maxconn=10,
host="localhost",
database="mydb",
user="myuser",
password="mypass"
)
def worker_function(worker_id):
"""Worker function for multi-threading."""
try:
# Get connection (thread-safe)
conn = pool.getconn()
print(f"Worker {worker_id} got connection")
with conn.cursor() as cur:
# Simulate work
cur.execute("SELECT pg_sleep(1)")
cur.execute("SELECT %s as worker_id", (worker_id,))
result = cur.fetchone()
print(f"Worker {worker_id} completed: {result[0]}")
except Exception as e:
print(f"Worker {worker_id} error: {e}")
finally:
# Always return connection to pool
pool.putconn(conn)
print(f"Worker {worker_id} returned connection")
# Create multiple threads
threads = []
for i in range(5):
thread = threading.Thread(target=worker_function, args=(i,))
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
print("All workers completed")
pool.closeall()Advanced pool configuration and monitoring patterns.
Usage Example:
import psycopg2
from psycopg2.pool import ThreadedConnectionPool, PoolError
import contextlib
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ManagedConnectionPool:
"""Wrapper for enhanced pool management."""
def __init__(self, minconn, maxconn, **db_params):
self.pool = ThreadedConnectionPool(minconn, maxconn, **db_params)
self.logger = logger
@contextlib.contextmanager
def get_connection(self):
"""Context manager for automatic connection handling."""
conn = None
try:
conn = self.pool.getconn()
self.logger.info("Connection acquired from pool")
yield conn
except PoolError as e:
self.logger.error(f"Pool error: {e}")
raise
except Exception as e:
if conn:
conn.rollback()
self.logger.error(f"Database error: {e}")
raise
finally:
if conn:
self.pool.putconn(conn)
self.logger.info("Connection returned to pool")
def execute_query(self, query, params=None):
"""Execute query with automatic connection management."""
with self.get_connection() as conn:
with conn.cursor() as cur:
cur.execute(query, params)
if cur.description: # SELECT query
return cur.fetchall()
else: # INSERT/UPDATE/DELETE
conn.commit()
return cur.rowcount
def close(self):
"""Close all pool connections."""
self.pool.closeall()
self.logger.info("Pool closed")
# Usage
managed_pool = ManagedConnectionPool(
minconn=2,
maxconn=8,
host="localhost",
database="mydb",
user="myuser",
password="mypass"
)
try:
# Simple query execution
users = managed_pool.execute_query("SELECT id, name FROM users LIMIT 5")
for user in users:
print(f"User: {user[1]}")
# Insert with parameters
rows_affected = managed_pool.execute_query(
"INSERT INTO logs (message, level) VALUES (%s, %s)",
("Application started", "INFO")
)
print(f"Inserted {rows_affected} rows")
# Using context manager directly
with managed_pool.get_connection() as conn:
with conn.cursor() as cur:
cur.execute("BEGIN")
cur.execute("INSERT INTO users (name) VALUES (%s)", ("Alice",))
cur.execute("INSERT INTO profiles (user_id, bio) VALUES (currval('users_id_seq'), %s)", ("Bio",))
conn.commit()
finally:
managed_pool.close()Monitor pool health and connection status.
Usage Example:
import psycopg2
from psycopg2.pool import ThreadedConnectionPool
import threading
import time
class MonitoredPool:
"""Connection pool with monitoring capabilities."""
def __init__(self, minconn, maxconn, **db_params):
self.pool = ThreadedConnectionPool(minconn, maxconn, **db_params)
self.stats = {
'connections_created': 0,
'connections_borrowed': 0,
'connections_returned': 0,
'active_connections': 0
}
self._lock = threading.Lock()
def getconn(self, key=None):
"""Get connection with stats tracking."""
with self._lock:
self.stats['connections_borrowed'] += 1
self.stats['active_connections'] += 1
conn = self.pool.getconn(key)
# Test connection health
try:
with conn.cursor() as cur:
cur.execute("SELECT 1")
except psycopg2.Error:
# Connection is bad, close it and get a new one
self.pool.putconn(conn, close=True)
conn = self.pool.getconn(key)
with self._lock:
self.stats['connections_created'] += 1
return conn
def putconn(self, conn, key=None, close=False):
"""Return connection with stats tracking."""
self.pool.putconn(conn, key, close)
with self._lock:
self.stats['connections_returned'] += 1
self.stats['active_connections'] -= 1
def get_stats(self):
"""Get pool statistics."""
with self._lock:
return self.stats.copy()
def health_check(self):
"""Perform pool health check."""
try:
conn = self.pool.getconn()
with conn.cursor() as cur:
cur.execute("SELECT version()")
version = cur.fetchone()[0]
self.pool.putconn(conn)
return True, f"Pool healthy - {version}"
except Exception as e:
return False, f"Pool unhealthy - {e}"
def closeall(self):
"""Close all connections."""
self.pool.closeall()
# Usage
monitored_pool = MonitoredPool(
minconn=2,
maxconn=6,
host="localhost",
database="mydb",
user="myuser",
password="mypass"
)
# Simulate usage
def simulate_work():
conn = monitored_pool.getconn()
time.sleep(0.1) # Simulate work
monitored_pool.putconn(conn)
# Run multiple workers
threads = [threading.Thread(target=simulate_work) for _ in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
# Check stats
stats = monitored_pool.get_stats()
print(f"Pool stats: {stats}")
# Health check
healthy, message = monitored_pool.health_check()
print(f"Health check: {message}")
monitored_pool.closeall()Robust error handling patterns for connection pools.
Usage Example:
import psycopg2
from psycopg2.pool import ThreadedConnectionPool, PoolError
import time
import logging
logger = logging.getLogger(__name__)
class ResilientPool:
"""Connection pool with error recovery."""
def __init__(self, minconn, maxconn, **db_params):
self.minconn = minconn
self.maxconn = maxconn
self.db_params = db_params
self.pool = None
self._create_pool()
def _create_pool(self):
"""Create or recreate pool."""
try:
if self.pool:
self.pool.closeall()
self.pool = ThreadedConnectionPool(
self.minconn, self.maxconn, **self.db_params
)
logger.info("Pool created successfully")
except Exception as e:
logger.error(f"Failed to create pool: {e}")
raise
def get_connection(self, retry_count=3):
"""Get connection with retry logic."""
for attempt in range(retry_count):
try:
if not self.pool:
self._create_pool()
conn = self.pool.getconn()
# Test connection
with conn.cursor() as cur:
cur.execute("SELECT 1")
return conn
except PoolError as e:
logger.warning(f"Pool exhausted (attempt {attempt + 1}): {e}")
if attempt < retry_count - 1:
time.sleep(0.1 * (attempt + 1)) # Exponential backoff
else:
raise
except psycopg2.OperationalError as e:
logger.error(f"Database connection error (attempt {attempt + 1}): {e}")
if attempt < retry_count - 1:
# Try to recreate pool
self._create_pool()
time.sleep(0.5 * (attempt + 1))
else:
raise
def return_connection(self, conn, close_on_error=True):
"""Return connection with error handling."""
try:
if conn.closed:
logger.warning("Returning closed connection")
return
# Check if connection is in a transaction
if conn.status != 1: # STATUS_READY
conn.rollback()
logger.info("Rolled back transaction before returning connection")
self.pool.putconn(conn)
except Exception as e:
logger.error(f"Error returning connection: {e}")
if close_on_error:
try:
self.pool.putconn(conn, close=True)
except:
pass
def close(self):
"""Close pool."""
if self.pool:
self.pool.closeall()
self.pool = None
# Usage
resilient_pool = ResilientPool(
minconn=1,
maxconn=5,
host="localhost",
database="mydb",
user="myuser",
password="mypass"
)
try:
conn = resilient_pool.get_connection()
with conn.cursor() as cur:
cur.execute("SELECT * FROM users LIMIT 1")
result = cur.fetchone()
print(f"Query result: {result}")
resilient_pool.return_connection(conn)
except Exception as e:
print(f"Error: {e}")
finally:
resilient_pool.close()class AbstractConnectionPool:
"""Base pool class."""
minconn: int # Minimum connections
maxconn: int # Maximum connections
closed: bool # Pool status
class SimpleConnectionPool(AbstractConnectionPool):
"""Non-thread-safe pool."""
def getconn(self, key=None) -> connection:
"""Get connection."""
def putconn(self, conn=None, key=None, close=False) -> None:
"""Return connection."""
def closeall(self) -> None:
"""Close all connections."""
class ThreadedConnectionPool(AbstractConnectionPool):
"""Thread-safe pool."""
def getconn(self, key=None) -> connection:
"""Get connection (thread-safe)."""
def putconn(self, conn=None, key=None, close=False) -> None:
"""Return connection (thread-safe)."""
def closeall(self) -> None:
"""Close all connections (thread-safe)."""class PoolError(psycopg2.Error):
"""Pool-related errors."""# Pool initialization parameters
minconn: int # Minimum connections (1 <= minconn <= maxconn)
maxconn: int # Maximum connections
*args: tuple # Positional arguments for psycopg2.connect()
**kwargs: dict # Keyword arguments for psycopg2.connect()
# Common connection parameters
host: str # Database host
port: int # Database port (default: 5432)
database: str # Database name
user: str # Username
password: str # PasswordInstall with Tessl CLI
npx tessl i tessl/pypi-psycopg2