CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mysql-connector-python

A self-contained Python driver for communicating with MySQL servers, using an API that is compliant with the Python Database API Specification v2.0 (PEP 249).

Pending
Overview
Eval results
Files

pooling.mddocs/

Connection Pooling

Manage connection pools for high-performance applications with automatic connection lifecycle management, configurable pool sizes, and connection reuse strategies.

Core Pooling Classes

MySQLConnectionPool

class MySQLConnectionPool:
    """
    Connection pool manager with configurable size and behavior.
    Manages a pool of database connections for efficient reuse.
    """
    
    def __init__(self, 
                 pool_name: Optional[str] = None,
                 pool_size: int = 5,
                 pool_reset_session: bool = True,
                 **kwargs) -> None:
        """
        Initialize connection pool.
        
        Args:
            pool_name: Unique identifier for the pool
            pool_size: Maximum number of connections (default: 5)
            pool_reset_session: Reset session on connection reuse (default: True)
            **kwargs: Connection parameters passed to individual connections
        """
        pass
    
    def get_connection(self) -> 'PooledMySQLConnection':
        """
        Get connection from pool.
        
        Returns:
            PooledMySQLConnection instance
            
        Raises:
            PoolError: When pool is exhausted and timeout reached
        """
        pass
    
    def add_connection(self, cnx: Optional['MySQLConnection'] = None) -> 'PooledMySQLConnection':
        """Add connection to pool or create new one."""
        pass
    
    def set_config(self, **kwargs) -> None:
        """Update pool configuration."""
        pass
    
    @property
    def pool_name(self) -> str:
        """Pool name identifier."""
        pass
    
    @pool_name.setter
    def pool_name(self, value: str) -> None:
        """Set pool name."""
        pass
    
    @property
    def pool_size(self) -> int:
        """Maximum pool size."""
        pass
    
    @pool_size.setter
    def pool_size(self, value: int) -> None:
        """Set maximum pool size."""
        pass
    
    @property
    def pool_reset_session(self) -> bool:
        """Whether to reset session on connection reuse."""
        pass
    
    @pool_reset_session.setter
    def pool_reset_session(self, value: bool) -> None:
        """Set session reset behavior."""
        pass
    
    def close(self) -> None:
        """Close all connections in pool."""
        pass
    
    def __del__(self) -> None:
        """Cleanup pool on deletion."""
        pass

PooledMySQLConnection

class PooledMySQLConnection:
    """
    Pooled connection wrapper that returns connection to pool on close.
    Provides same interface as MySQLConnection with pool integration.
    """
    
    def __init__(self, pool: MySQLConnectionPool, cnx: 'MySQLConnection') -> None:
        """Initialize pooled connection wrapper."""
        pass
    
    def close(self) -> None:
        """Return connection to pool instead of closing."""
        pass
    
    def config(self, **kwargs) -> None:
        """Configure underlying connection."""
        pass
    
    @property
    def pool_name(self) -> str:
        """Name of the connection pool."""
        pass
    
    def __getattr__(self, name: str) -> Any:
        """Delegate attribute access to underlying connection."""
        pass
    
    def __enter__(self) -> 'PooledMySQLConnection':
        """Context manager entry."""
        pass
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """Context manager exit returning connection to pool."""
        pass

Pooling Functions

connect() with Pooling

def connect(**kwargs) -> Union[MySQLConnection, PooledMySQLConnection]:
    """
    Create database connection with optional pooling.
    
    When pool parameters are provided, returns PooledMySQLConnection.
    Otherwise returns standard MySQLConnection.
    
    Pool Parameters:
        pool_name: Pool identifier
        pool_size: Maximum connections in pool
        pool_reset_session: Reset session variables on reuse
        pool_timeout: Maximum wait time for available connection
        
    Returns:
        PooledMySQLConnection when pooling, MySQLConnection otherwise
    """
    pass

def generate_pool_name(**kwargs) -> str:
    """
    Generate pool name from connection parameters.
    
    Creates unique pool name based on host, port, user, and database.
    
    Returns:
        Generated pool name string
    """
    pass

Pool Configuration

Basic Pool Parameters

pool_config = {
    'pool_name': str,           # Unique pool identifier
    'pool_size': int,           # Maximum connections (default: 5, max: 32)
    'pool_reset_session': bool, # Reset session on reuse (default: True)
    'pool_timeout': int,        # Wait timeout in seconds (default: 0 = no timeout)
}

Connection Parameters for Pool

All standard connection parameters can be used with pooling:

pooled_connection_config = {
    # Pool-specific
    'pool_name': 'myapp_pool',
    'pool_size': 10,
    'pool_reset_session': True,
    'pool_timeout': 30,
    
    # Connection parameters
    'host': 'localhost',
    'port': 3306,
    'user': 'myuser',
    'password': 'mypassword',
    'database': 'mydatabase',
    'charset': 'utf8mb4',
    'autocommit': False,
    
    # SSL parameters
    'ssl_disabled': False,
    'ssl_verify_cert': True,
    
    # Performance parameters
    'use_pure': False,  # Use C extension if available
    'buffered': True,
    'compress': False,
}

Usage Examples

Basic Connection Pool

import mysql.connector

# Create connection pool
config = {
    'host': 'localhost',
    'user': 'myuser', 
    'password': 'mypassword',
    'database': 'mydatabase',
    'pool_name': 'myapp_pool',
    'pool_size': 5
}

# Get pooled connection
connection = mysql.connector.connect(**config)
print(f"Connected via pool: {connection.pool_name}")

cursor = connection.cursor()
cursor.execute("SELECT COUNT(*) FROM users")
count = cursor.fetchone()[0]
print(f"User count: {count}")

cursor.close()
connection.close()  # Returns connection to pool

Explicit Pool Management

import mysql.connector

# Create pool explicitly
pool_config = {
    'host': 'localhost',
    'user': 'myuser',
    'password': 'mypassword', 
    'database': 'mydatabase'
}

pool = mysql.connector.pooling.MySQLConnectionPool(
    pool_name='explicit_pool',
    pool_size=8,
    pool_reset_session=True,
    **pool_config
)

# Get connections from pool
try:
    connection1 = pool.get_connection()
    connection2 = pool.get_connection()
    
    # Use connections
    cursor1 = connection1.cursor()
    cursor1.execute("SELECT 'Connection 1' as source")
    result1 = cursor1.fetchone()
    print(result1)
    
    cursor2 = connection2.cursor()
    cursor2.execute("SELECT 'Connection 2' as source")
    result2 = cursor2.fetchone()
    print(result2)
    
finally:
    # Return connections to pool
    cursor1.close()
    cursor2.close()
    connection1.close()
    connection2.close()
    
    # Close entire pool
    pool.close()

Pool with Context Managers

import mysql.connector

config = {
    'host': 'localhost',
    'user': 'myuser',
    'password': 'mypassword',
    'database': 'mydatabase',
    'pool_name': 'context_pool',
    'pool_size': 3
}

# Automatic connection return to pool
with mysql.connector.connect(**config) as connection:
    with connection.cursor(dictionary=True) as cursor:
        cursor.execute("SELECT id, name FROM users LIMIT 5")
        users = cursor.fetchall()
        
        for user in users:
            print(f"User {user['id']}: {user['name']}")
        # Cursor automatically closed
    # Connection automatically returned to pool

Multiple Workers with Shared Pool

import mysql.connector
import threading
import time

# Shared pool configuration
pool_config = {
    'host': 'localhost',
    'user': 'myuser',
    'password': 'mypassword',
    'database': 'mydatabase',
    'pool_name': 'worker_pool',
    'pool_size': 5,
    'pool_timeout': 10  # Wait up to 10 seconds for connection
}

def worker_task(worker_id: int):
    """Worker function that uses pooled connection."""
    try:
        # Get connection from shared pool
        connection = mysql.connector.connect(**pool_config)
        
        cursor = connection.cursor()
        cursor.execute("SELECT SLEEP(%s)", (1,))  # Simulate work
        cursor.fetchone()
        
        print(f"Worker {worker_id} completed using pool connection")
        
        cursor.close()
        connection.close()  # Return to pool
        
    except mysql.connector.PoolError as err:
        print(f"Worker {worker_id} failed to get connection: {err}")

# Create multiple worker threads
threads = []
for i in range(10):  # More workers than pool size
    thread = threading.Thread(target=worker_task, args=(i,))
    threads.append(thread)
    thread.start()

# Wait for all workers to complete
for thread in threads:
    thread.join()

print("All workers completed")

Pool with Failover Configuration

import mysql.connector

# Pool with failover servers
config = {
    'user': 'myuser',
    'password': 'mypassword',
    'database': 'mydatabase',
    'pool_name': 'failover_pool',
    'pool_size': 5,
    'failover': [
        {'host': 'primary.mysql.example.com', 'port': 3306},
        {'host': 'secondary.mysql.example.com', 'port': 3306},
        {'host': 'tertiary.mysql.example.com', 'port': 3306}
    ]
}

try:
    connection = mysql.connector.connect(**config)
    print(f"Connected to MySQL via pool: {connection.pool_name}")
    
    cursor = connection.cursor()
    cursor.execute("SELECT @@hostname as server")
    server = cursor.fetchone()[0]
    print(f"Connected to server: {server}")
    
    cursor.close()
    connection.close()
    
except mysql.connector.Error as err:
    print(f"Connection failed: {err}")

Pool Monitoring

import mysql.connector
import threading

# Create pool
pool = mysql.connector.pooling.MySQLConnectionPool(
    pool_name='monitored_pool',
    pool_size=3,
    host='localhost',
    user='myuser',
    password='mypassword',
    database='mydatabase'
)

def monitor_pool():
    """Monitor pool usage."""
    while True:
        # Note: These are conceptual - actual implementation may vary
        print(f"Pool size: {pool.pool_size}")
        print(f"Pool name: {pool.pool_name}")
        time.sleep(5)

# Start monitoring thread
monitor_thread = threading.Thread(target=monitor_pool, daemon=True)
monitor_thread.start()

# Use pool connections
connections = []
try:
    # Get multiple connections
    for i in range(3):
        conn = pool.get_connection()
        connections.append(conn)
        print(f"Got connection {i+1}")
        
    # Try to get one more (should wait or fail based on timeout)
    try:
        extra_conn = pool.get_connection()
        print("Got extra connection")
        connections.append(extra_conn)
    except mysql.connector.PoolError as err:
        print(f"Pool exhausted: {err}")
        
finally:
    # Return all connections
    for conn in connections:
        conn.close()
    pool.close()

Advanced Pool Configuration

import mysql.connector

# Advanced pool with session reset
config = {
    'host': 'localhost',
    'user': 'myuser',
    'password': 'mypassword',
    'database': 'mydatabase',
    'pool_name': 'advanced_pool',
    'pool_size': 10,
    'pool_reset_session': True,  # Reset session variables
    'pool_timeout': 30,          # Wait timeout
    
    # Connection tuning
    'connect_timeout': 10,
    'read_timeout': 30,
    'write_timeout': 30,
    'charset': 'utf8mb4',
    'collation': 'utf8mb4_unicode_ci',
    'autocommit': False,
    'buffered': True,
    'use_pure': False,  # Use C extension
    
    # SSL configuration
    'ssl_disabled': False,
    'ssl_verify_cert': True,
    'ssl_verify_identity': True,
}

# Use advanced pool
connection = mysql.connector.connect(**config)

# Connection has session reset to default state
cursor = connection.cursor()
cursor.execute("SELECT @@autocommit, @@sql_mode")
settings = cursor.fetchone()
print(f"Autocommit: {settings[0]}, SQL Mode: {settings[1]}")

cursor.close()
connection.close()

Install with Tessl CLI

npx tessl i tessl/pypi-mysql-connector-python

docs

async.md

auth.md

connection.md

constants.md

cursors.md

errors.md

index.md

pooling.md

types.md

utilities.md

tile.json