CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-psycopg2-binary

PostgreSQL database adapter for Python with thread-safe connection pooling and SQL operations

Pending
Overview
Eval results
Files

connection-pooling.mddocs/

Connection Pooling

Thread-safe and simple connection pools for managing database connections in multi-threaded applications. Connection pooling reduces connection overhead, manages connection limits, and provides automatic connection recycling with configurable pool sizes.

Capabilities

Abstract Connection Pool

Base class providing core pooling functionality with connection management, recycling, and cleanup.

class AbstractConnectionPool:
    """Generic key-based pooling code."""
    
    def __init__(self, minconn, maxconn, *args, **kwargs):
        """
        Initialize connection pool.
        
        Parameters:
        - minconn (int): Minimum number of connections to maintain
        - maxconn (int): Maximum number of connections allowed
        - *args: Arguments passed to psycopg2.connect()
        - **kwargs: Keyword arguments passed to psycopg2.connect()
        """
    
    def _connect(self, key=None):
        """
        Create new connection.
        
        Parameters:
        - key: Optional connection key
        
        Returns:
        connection: New database connection
        """
    
    def _getconn(self, key=None):
        """
        Get connection from pool.
        
        Parameters:
        - key: Optional connection key
        
        Returns:
        connection: Available connection
        """
    
    def _putconn(self, conn, key=None, close=False):
        """
        Return connection to pool.
        
        Parameters:
        - conn (connection): Connection to return
        - key: Connection key
        - close (bool): Force close connection
        """
    
    def _closeall(self):
        """Close all connections in pool."""
    
    @property
    def closed(self):
        """Pool closed status."""
    
    @property 
    def minconn(self):
        """Minimum connections maintained."""
    
    @property
    def maxconn(self):
        """Maximum connections allowed."""

Simple Connection Pool

Connection pool for single-threaded applications without locking mechanisms.

class SimpleConnectionPool(AbstractConnectionPool):
    """Connection pool for single-threaded applications."""
    
    def getconn(self, key=None):
        """
        Get connection from pool.
        
        Parameters:
        - key: Optional connection identifier
        
        Returns:
        connection: Available database connection
        
        Raises:
        PoolError: If pool is closed or exhausted
        """
    
    def putconn(self, conn, key=None, close=False):
        """
        Return connection to pool.
        
        Parameters:
        - conn (connection): Connection to return
        - key: Connection key (auto-detected if None)
        - close (bool): Force close instead of recycling
        
        Raises:
        PoolError: If pool is closed or connection invalid
        """
    
    def closeall(self):
        """
        Close all connections in pool.
        
        Raises:
        PoolError: If pool already closed
        """

Usage examples:

from psycopg2.pool import SimpleConnectionPool

# Create simple pool
pool = SimpleConnectionPool(
    2,  # minimum connections
    10, # maximum connections  
    host="localhost",
    database="mydb",
    user="postgres",
    password="secret"
)

# Get connection
conn = pool.getconn()

try:
    # Use connection
    cur = conn.cursor()
    cur.execute("SELECT * FROM users")
    users = cur.fetchall()
    cur.close()
finally:
    # Always return connection
    pool.putconn(conn)

# Keyed connections
conn1 = pool.getconn("user_queries")
conn2 = pool.getconn("admin_queries")

# Return keyed connections
pool.putconn(conn1, "user_queries")  
pool.putconn(conn2, "admin_queries")

# Close pool when done
pool.closeall()

Threaded Connection Pool

Thread-safe connection pool with locking for multi-threaded applications.

class ThreadedConnectionPool(AbstractConnectionPool):
    """Thread-safe connection pool."""
    
    def __init__(self, minconn, maxconn, *args, **kwargs):
        """
        Initialize threaded pool.
        
        Parameters:
        - minconn (int): Minimum connections to maintain
        - maxconn (int): Maximum connections allowed
        - *args: Arguments for psycopg2.connect()
        - **kwargs: Keyword arguments for psycopg2.connect()
        """
    
    def getconn(self, key=None):
        """
        Thread-safe get connection from pool.
        
        Parameters:
        - key: Optional connection identifier
        
        Returns:
        connection: Available database connection
        
        Raises:
        PoolError: If pool is closed or exhausted
        """
    
    def putconn(self, conn=None, key=None, close=False):
        """
        Thread-safe return connection to pool.
        
        Parameters:
        - conn (connection, optional): Connection to return
        - key: Connection key (auto-detected if conn is None)
        - close (bool): Force close instead of recycling
        
        Raises:
        PoolError: If pool is closed or connection invalid
        """
    
    def closeall(self):
        """
        Thread-safe close all connections.
        
        Raises:
        PoolError: If pool already closed
        """

Usage examples:

from psycopg2.pool import ThreadedConnectionPool
import threading

# Create thread-safe pool
pool = ThreadedConnectionPool(
    1,   # minimum connections
    20,  # maximum connections
    host="localhost", 
    database="mydb",
    user="postgres",
    password="secret"
)

def worker_function(worker_id):
    """Worker function for threading example."""
    conn = None
    try:
        # Get connection (thread-safe)
        conn = pool.getconn()
        
        # Use connection
        cur = conn.cursor()
        cur.execute("SELECT COUNT(*) FROM large_table WHERE worker_id = %s", 
                   (worker_id,))
        count = cur.fetchone()[0]
        cur.close()
        
        print(f"Worker {worker_id}: {count} records")
        
    finally:
        if conn:
            # Return connection (thread-safe)
            pool.putconn(conn)

# Create multiple threads
threads = []
for i in range(5):
    thread = threading.Thread(target=worker_function, args=(i,))
    threads.append(thread)
    thread.start()

# Wait for completion
for thread in threads:
    thread.join()

# Cleanup
pool.closeall()

Pool Context Managers

Connection pools support context manager protocol for automatic cleanup.

# Pool context manager
with ThreadedConnectionPool(1, 10, **conn_params) as pool:
    conn = pool.getconn()
    try:
        # Use connection
        cur = conn.cursor()
        cur.execute("SELECT 1")
        result = cur.fetchone()
    finally:
        pool.putconn(conn)
# Pool automatically closed

# Connection context manager with pool
def get_pooled_connection():
    conn = pool.getconn()
    try:
        yield conn
    finally:
        pool.putconn(conn)

# Usage
with get_pooled_connection() as conn:
    cur = conn.cursor()
    cur.execute("SELECT * FROM users")
    users = cur.fetchall()

Advanced Pool Management

Connection pools provide advanced features for monitoring and management.

# Pool status monitoring
print(f"Pool closed: {pool.closed}")
print(f"Min connections: {pool.minconn}")
print(f"Max connections: {pool.maxconn}")

# Force close connections
conn = pool.getconn()
pool.putconn(conn, close=True)  # Force close instead of recycling

# Connection validation and recycling
# Pools automatically handle:
# - Connection validation before reuse
# - Transaction rollback on return
# - Connection replacement for closed connections
# - Automatic cleanup of idle connections

Pool Error Handling

Comprehensive error handling for pool operations.

from psycopg2.pool import PoolError

try:
    conn = pool.getconn()
    # Use connection
except PoolError as e:
    if "exhausted" in str(e):
        print("No connections available - consider increasing maxconn")
    elif "closed" in str(e):
        print("Pool has been closed")
    else:
        print(f"Pool error: {e}")

# Handling connection failures
try:
    pool.putconn(invalid_conn)
except PoolError as e:
    print(f"Cannot return connection: {e}")

Custom Connection Factories with Pools

Use custom connection classes with connection pools.

from psycopg2.extras import DictConnection

# Pool with custom connection factory
class DictConnectionPool(ThreadedConnectionPool):
    def _connect(self, key=None):
        """Create connection with DictConnection factory."""
        conn = psycopg2.connect(*self._args, 
                               connection_factory=DictConnection,
                               **self._kwargs)
        if key is not None:
            self._used[key] = conn
            self._rused[id(conn)] = key
        else:
            self._pool.append(conn)
        return conn

# Use custom pool
dict_pool = DictConnectionPool(1, 10, **conn_params)
conn = dict_pool.getconn()  # Returns DictConnection
cur = conn.cursor()         # Returns DictCursor by default

Types

Pool Exceptions

class PoolError(psycopg2.Error):
    """Exception raised by connection pool operations."""
    pass

Pool Configuration

PoolConfig = {
    'minconn': int,      # Minimum connections to maintain
    'maxconn': int,      # Maximum connections allowed
    'host': str,         # Database host
    'port': int,         # Database port
    'database': str,     # Database name
    'user': str,         # Username
    'password': str,     # Password
    # ... other psycopg2.connect() parameters
}

Pool State

PoolState = {
    'closed': bool,      # Pool closed status
    'minconn': int,      # Configured minimum connections
    'maxconn': int,      # Configured maximum connections
    '_pool': list,       # Available connections
    '_used': dict,       # Connections in use (key -> connection)
    '_rused': dict,      # Reverse mapping (connection_id -> key)
    '_keys': int         # Key counter
}

Thread Safety

# ThreadedConnectionPool uses threading.Lock for:
# - getconn() operations
# - putconn() operations  
# - closeall() operations
# - Internal pool state management

# SimpleConnectionPool provides no locking:
# - Suitable for single-threaded applications
# - Higher performance due to no locking overhead
# - Must not be shared between threads

Install with Tessl CLI

npx tessl i tessl/pypi-psycopg2-binary

docs

advanced-features.md

connection-pooling.md

connections-cursors.md

cursors-rows.md

error-handling.md

index.md

sql-composition.md

types-adaptation.md

tile.json