CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-psycopg2

Python-PostgreSQL Database Adapter

Pending
Overview
Eval results
Files

connection-pooling.mddocs/

Connection Pooling

Thread-safe and non-thread-safe connection pools for managing database connections efficiently in multi-threaded applications, reducing connection overhead and improving performance.

Capabilities

Pool Exception Handling

Specialized exception for pool-related errors.

class PoolError(psycopg2.Error):
    """Pool-related errors."""

Abstract Connection Pool

Base class for all connection pool implementations with core pooling functionality.

class AbstractConnectionPool:
    """Base connection pool class."""
    
    def __init__(self, minconn, maxconn, *args, **kwargs):
        """
        Initialize pool.
        
        Parameters:
        - minconn (int): Minimum connections to maintain
        - maxconn (int): Maximum connections allowed
        - *args: Connection arguments passed to psycopg2.connect()
        - **kwargs: Connection keyword arguments
        """
    
    @property
    def minconn(self):
        """Minimum connections to maintain."""
    
    @property
    def maxconn(self):
        """Maximum connections allowed."""
    
    @property
    def closed(self):
        """Pool closed status."""
    
    def _connect(self, key=None):
        """
        Create new connection (internal).
        
        Parameters:
        - key: Connection key for identification
        
        Returns:
        connection: New database connection
        """
    
    def _getkey(self):
        """Generate unique key (internal)."""
    
    def _getconn(self, key=None):
        """
        Get connection (internal).
        
        Parameters:
        - key: Connection key
        
        Returns:
        connection: Database connection
        """
    
    def _putconn(self, conn, key=None, close=False):
        """
        Return connection (internal).
        
        Parameters:
        - conn: Connection to return
        - key: Connection key
        - close (bool): Force close connection
        """
    
    def _closeall(self):
        """Close all connections (internal)."""

Simple Connection Pool

Non-thread-safe connection pool for single-threaded applications with direct access to pool methods.

class SimpleConnectionPool(AbstractConnectionPool):
    """Non-threadsafe connection pool."""
    
    def __init__(self, minconn, maxconn, *args, **kwargs):
        """
        Initialize simple pool.
        
        Parameters:
        - minconn (int): Minimum connections (1-maxconn)
        - maxconn (int): Maximum connections  
        - *args: Connection arguments
        - **kwargs: Connection keyword arguments
        """
    
    def getconn(self, key=None):
        """
        Get connection from pool.
        
        Parameters:
        - key: Optional connection key for tracking
        
        Returns:
        connection: Database connection
        
        Raises:
        PoolError: If no connections available
        """
    
    def putconn(self, conn=None, key=None, close=False):
        """
        Return connection to pool.
        
        Parameters:
        - conn: Connection to return (if None, uses key)
        - key: Connection key
        - close (bool): Force close connection instead of pooling
        """
    
    def closeall(self):
        """Close all connections and reset pool."""

Usage Example:

import psycopg2
from psycopg2.pool import SimpleConnectionPool

# Create connection pool
pool = SimpleConnectionPool(
    minconn=1,
    maxconn=5,
    host="localhost",
    database="mydb",
    user="myuser", 
    password="mypass"
)

# Get connection from pool
conn = pool.getconn()

try:
    with conn.cursor() as cur:
        cur.execute("SELECT * FROM users")
        users = cur.fetchall()
        
    # Connection is still open, can be reused
    with conn.cursor() as cur:
        cur.execute("INSERT INTO logs (message) VALUES (%s)", ("User query",))
        conn.commit()
        
finally:
    # Return connection to pool (don't close it)
    pool.putconn(conn)

# Get another connection (might be the same one)
conn2 = pool.getconn()
with conn2.cursor() as cur:
    cur.execute("SELECT COUNT(*) FROM users")
    count = cur.fetchone()[0]
    print(f"Total users: {count}")
    
pool.putconn(conn2)

# Clean up - close all connections
pool.closeall()

Threaded Connection Pool

Thread-safe connection pool for multi-threaded applications with automatic locking for concurrent access.

class ThreadedConnectionPool(AbstractConnectionPool):
    """Thread-safe connection pool."""
    
    def __init__(self, minconn, maxconn, *args, **kwargs):
        """
        Initialize threaded pool with locking.
        
        Parameters:
        - minconn (int): Minimum connections (1-maxconn)
        - maxconn (int): Maximum connections
        - *args: Connection arguments  
        - **kwargs: Connection keyword arguments
        """
    
    def getconn(self, key=None):
        """
        Get connection (thread-safe).
        
        Parameters:
        - key: Optional connection key for tracking
        
        Returns:
        connection: Database connection
        
        Raises:
        PoolError: If no connections available
        """
    
    def putconn(self, conn=None, key=None, close=False):
        """
        Return connection (thread-safe).
        
        Parameters:
        - conn: Connection to return (if None, uses key)
        - key: Connection key
        - close (bool): Force close connection
        """
    
    def closeall(self):
        """Close all connections (thread-safe)."""

Usage Example:

import psycopg2
import threading
import time
from psycopg2.pool import ThreadedConnectionPool

# Create thread-safe pool
pool = ThreadedConnectionPool(
    minconn=2,
    maxconn=10,
    host="localhost",
    database="mydb",
    user="myuser",
    password="mypass"
)

def worker_function(worker_id):
    """Worker function for multi-threading."""
    try:
        # Get connection (thread-safe)
        conn = pool.getconn()
        print(f"Worker {worker_id} got connection")
        
        with conn.cursor() as cur:
            # Simulate work
            cur.execute("SELECT pg_sleep(1)")
            cur.execute("SELECT %s as worker_id", (worker_id,))
            result = cur.fetchone()
            print(f"Worker {worker_id} completed: {result[0]}")
            
    except Exception as e:
        print(f"Worker {worker_id} error: {e}")
    finally:
        # Always return connection to pool
        pool.putconn(conn)
        print(f"Worker {worker_id} returned connection")

# Create multiple threads
threads = []
for i in range(5):
    thread = threading.Thread(target=worker_function, args=(i,))
    threads.append(thread)
    thread.start()

# Wait for all threads to complete
for thread in threads:
    thread.join()

print("All workers completed")
pool.closeall()

Pool Configuration and Management

Advanced pool configuration and monitoring patterns.

Usage Example:

import psycopg2
from psycopg2.pool import ThreadedConnectionPool, PoolError
import contextlib
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ManagedConnectionPool:
    """Wrapper for enhanced pool management."""
    
    def __init__(self, minconn, maxconn, **db_params):
        self.pool = ThreadedConnectionPool(minconn, maxconn, **db_params)
        self.logger = logger
    
    @contextlib.contextmanager
    def get_connection(self):
        """Context manager for automatic connection handling."""
        conn = None
        try:
            conn = self.pool.getconn()
            self.logger.info("Connection acquired from pool")
            yield conn
        except PoolError as e:
            self.logger.error(f"Pool error: {e}")
            raise
        except Exception as e:
            if conn:
                conn.rollback()
            self.logger.error(f"Database error: {e}")
            raise
        finally:
            if conn:
                self.pool.putconn(conn)
                self.logger.info("Connection returned to pool")
    
    def execute_query(self, query, params=None):
        """Execute query with automatic connection management."""
        with self.get_connection() as conn:
            with conn.cursor() as cur:
                cur.execute(query, params)
                if cur.description:  # SELECT query
                    return cur.fetchall()
                else:  # INSERT/UPDATE/DELETE
                    conn.commit()
                    return cur.rowcount
    
    def close(self):
        """Close all pool connections."""
        self.pool.closeall()
        self.logger.info("Pool closed")

# Usage
managed_pool = ManagedConnectionPool(
    minconn=2,
    maxconn=8,
    host="localhost",
    database="mydb",
    user="myuser",
    password="mypass"
)

try:
    # Simple query execution
    users = managed_pool.execute_query("SELECT id, name FROM users LIMIT 5")
    for user in users:
        print(f"User: {user[1]}")
    
    # Insert with parameters
    rows_affected = managed_pool.execute_query(
        "INSERT INTO logs (message, level) VALUES (%s, %s)",
        ("Application started", "INFO")
    )
    print(f"Inserted {rows_affected} rows")
    
    # Using context manager directly
    with managed_pool.get_connection() as conn:
        with conn.cursor() as cur:
            cur.execute("BEGIN")
            cur.execute("INSERT INTO users (name) VALUES (%s)", ("Alice",))
            cur.execute("INSERT INTO profiles (user_id, bio) VALUES (currval('users_id_seq'), %s)", ("Bio",))
            conn.commit()
    
finally:
    managed_pool.close()

Pool Monitoring and Health Checks

Monitor pool health and connection status.

Usage Example:

import psycopg2
from psycopg2.pool import ThreadedConnectionPool
import threading
import time

class MonitoredPool:
    """Connection pool with monitoring capabilities."""
    
    def __init__(self, minconn, maxconn, **db_params):
        self.pool = ThreadedConnectionPool(minconn, maxconn, **db_params)
        self.stats = {
            'connections_created': 0,
            'connections_borrowed': 0,
            'connections_returned': 0,
            'active_connections': 0
        }
        self._lock = threading.Lock()
    
    def getconn(self, key=None):
        """Get connection with stats tracking."""
        with self._lock:
            self.stats['connections_borrowed'] += 1
            self.stats['active_connections'] += 1
        
        conn = self.pool.getconn(key)
        
        # Test connection health
        try:
            with conn.cursor() as cur:
                cur.execute("SELECT 1")
        except psycopg2.Error:
            # Connection is bad, close it and get a new one
            self.pool.putconn(conn, close=True)
            conn = self.pool.getconn(key)
            with self._lock:
                self.stats['connections_created'] += 1
        
        return conn
    
    def putconn(self, conn, key=None, close=False):
        """Return connection with stats tracking."""
        self.pool.putconn(conn, key, close)
        with self._lock:
            self.stats['connections_returned'] += 1
            self.stats['active_connections'] -= 1
    
    def get_stats(self):
        """Get pool statistics."""
        with self._lock:
            return self.stats.copy()
    
    def health_check(self):
        """Perform pool health check."""
        try:
            conn = self.pool.getconn()
            with conn.cursor() as cur:
                cur.execute("SELECT version()")
                version = cur.fetchone()[0]
            self.pool.putconn(conn)
            return True, f"Pool healthy - {version}"
        except Exception as e:
            return False, f"Pool unhealthy - {e}"
    
    def closeall(self):
        """Close all connections."""
        self.pool.closeall()

# Usage
monitored_pool = MonitoredPool(
    minconn=2,
    maxconn=6,
    host="localhost",
    database="mydb",
    user="myuser",
    password="mypass"
)

# Simulate usage
def simulate_work():
    conn = monitored_pool.getconn()
    time.sleep(0.1)  # Simulate work
    monitored_pool.putconn(conn)

# Run multiple workers
threads = [threading.Thread(target=simulate_work) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

# Check stats
stats = monitored_pool.get_stats()
print(f"Pool stats: {stats}")

# Health check
healthy, message = monitored_pool.health_check()
print(f"Health check: {message}")

monitored_pool.closeall()

Error Handling and Recovery

Robust error handling patterns for connection pools.

Usage Example:

import psycopg2
from psycopg2.pool import ThreadedConnectionPool, PoolError
import time
import logging

logger = logging.getLogger(__name__)

class ResilientPool:
    """Connection pool with error recovery."""
    
    def __init__(self, minconn, maxconn, **db_params):
        self.minconn = minconn
        self.maxconn = maxconn
        self.db_params = db_params
        self.pool = None
        self._create_pool()
    
    def _create_pool(self):
        """Create or recreate pool."""
        try:
            if self.pool:
                self.pool.closeall()
            self.pool = ThreadedConnectionPool(
                self.minconn, self.maxconn, **self.db_params
            )
            logger.info("Pool created successfully")
        except Exception as e:
            logger.error(f"Failed to create pool: {e}")
            raise
    
    def get_connection(self, retry_count=3):
        """Get connection with retry logic."""
        for attempt in range(retry_count):
            try:
                if not self.pool:
                    self._create_pool()
                
                conn = self.pool.getconn()
                
                # Test connection
                with conn.cursor() as cur:
                    cur.execute("SELECT 1")
                
                return conn
                
            except PoolError as e:
                logger.warning(f"Pool exhausted (attempt {attempt + 1}): {e}")
                if attempt < retry_count - 1:
                    time.sleep(0.1 * (attempt + 1))  # Exponential backoff
                else:
                    raise
                    
            except psycopg2.OperationalError as e:
                logger.error(f"Database connection error (attempt {attempt + 1}): {e}")
                if attempt < retry_count - 1:
                    # Try to recreate pool
                    self._create_pool()
                    time.sleep(0.5 * (attempt + 1))
                else:
                    raise
    
    def return_connection(self, conn, close_on_error=True):
        """Return connection with error handling."""
        try:
            if conn.closed:
                logger.warning("Returning closed connection")
                return
            
            # Check if connection is in a transaction
            if conn.status != 1:  # STATUS_READY
                conn.rollback()
                logger.info("Rolled back transaction before returning connection")
            
            self.pool.putconn(conn)
            
        except Exception as e:
            logger.error(f"Error returning connection: {e}")
            if close_on_error:
                try:
                    self.pool.putconn(conn, close=True)
                except:
                    pass
    
    def close(self):
        """Close pool."""
        if self.pool:
            self.pool.closeall()
            self.pool = None

# Usage
resilient_pool = ResilientPool(
    minconn=1,
    maxconn=5,
    host="localhost",
    database="mydb",
    user="myuser",
    password="mypass"
)

try:
    conn = resilient_pool.get_connection()
    
    with conn.cursor() as cur:
        cur.execute("SELECT * FROM users LIMIT 1")
        result = cur.fetchone()
        print(f"Query result: {result}")
    
    resilient_pool.return_connection(conn)
    
except Exception as e:
    print(f"Error: {e}")
    
finally:
    resilient_pool.close()

Types

Pool Classes Hierarchy

class AbstractConnectionPool:
    """Base pool class."""
    
    minconn: int  # Minimum connections
    maxconn: int  # Maximum connections  
    closed: bool  # Pool status

class SimpleConnectionPool(AbstractConnectionPool):
    """Non-thread-safe pool."""
    
    def getconn(self, key=None) -> connection:
        """Get connection."""
    
    def putconn(self, conn=None, key=None, close=False) -> None:
        """Return connection."""
    
    def closeall(self) -> None:
        """Close all connections."""

class ThreadedConnectionPool(AbstractConnectionPool):
    """Thread-safe pool."""
    
    def getconn(self, key=None) -> connection:
        """Get connection (thread-safe)."""
    
    def putconn(self, conn=None, key=None, close=False) -> None:
        """Return connection (thread-safe)."""
    
    def closeall(self) -> None:
        """Close all connections (thread-safe)."""

Pool Error Types

class PoolError(psycopg2.Error):
    """Pool-related errors."""

Pool Configuration Parameters

# Pool initialization parameters
minconn: int  # Minimum connections (1 <= minconn <= maxconn)
maxconn: int  # Maximum connections
*args: tuple  # Positional arguments for psycopg2.connect()
**kwargs: dict  # Keyword arguments for psycopg2.connect()

# Common connection parameters
host: str  # Database host
port: int  # Database port (default: 5432)
database: str  # Database name
user: str  # Username
password: str  # Password

Install with Tessl CLI

npx tessl i tessl/pypi-psycopg2

docs

advanced-cursors.md

batch-operations.md

connection-pooling.md

connections-cursors.md

error-handling.md

index.md

replication.md

sql-composition.md

timezone-support.md

type-adaptation.md

tile.json