A self-contained Python driver for communicating with MySQL servers, using an API that is compliant with the Python Database API Specification v2.0 (PEP 249).
—
Manage connection pools for high-performance applications with automatic connection lifecycle management, configurable pool sizes, and connection reuse strategies.
class MySQLConnectionPool:
"""
Connection pool manager with configurable size and behavior.
Manages a pool of database connections for efficient reuse.
"""
def __init__(self,
pool_name: Optional[str] = None,
pool_size: int = 5,
pool_reset_session: bool = True,
**kwargs) -> None:
"""
Initialize connection pool.
Args:
pool_name: Unique identifier for the pool
pool_size: Maximum number of connections (default: 5)
pool_reset_session: Reset session on connection reuse (default: True)
**kwargs: Connection parameters passed to individual connections
"""
pass
def get_connection(self) -> 'PooledMySQLConnection':
"""
Get connection from pool.
Returns:
PooledMySQLConnection instance
Raises:
PoolError: When pool is exhausted and timeout reached
"""
pass
def add_connection(self, cnx: Optional['MySQLConnection'] = None) -> 'PooledMySQLConnection':
"""Add connection to pool or create new one."""
pass
def set_config(self, **kwargs) -> None:
"""Update pool configuration."""
pass
@property
def pool_name(self) -> str:
"""Pool name identifier."""
pass
@pool_name.setter
def pool_name(self, value: str) -> None:
"""Set pool name."""
pass
@property
def pool_size(self) -> int:
"""Maximum pool size."""
pass
@pool_size.setter
def pool_size(self, value: int) -> None:
"""Set maximum pool size."""
pass
@property
def pool_reset_session(self) -> bool:
"""Whether to reset session on connection reuse."""
pass
@pool_reset_session.setter
def pool_reset_session(self, value: bool) -> None:
"""Set session reset behavior."""
pass
def close(self) -> None:
"""Close all connections in pool."""
pass
def __del__(self) -> None:
"""Cleanup pool on deletion."""
passclass PooledMySQLConnection:
"""
Pooled connection wrapper that returns connection to pool on close.
Provides same interface as MySQLConnection with pool integration.
"""
def __init__(self, pool: MySQLConnectionPool, cnx: 'MySQLConnection') -> None:
"""Initialize pooled connection wrapper."""
pass
def close(self) -> None:
"""Return connection to pool instead of closing."""
pass
def config(self, **kwargs) -> None:
"""Configure underlying connection."""
pass
@property
def pool_name(self) -> str:
"""Name of the connection pool."""
pass
def __getattr__(self, name: str) -> Any:
"""Delegate attribute access to underlying connection."""
pass
def __enter__(self) -> 'PooledMySQLConnection':
"""Context manager entry."""
pass
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""Context manager exit returning connection to pool."""
passdef connect(**kwargs) -> Union[MySQLConnection, PooledMySQLConnection]:
"""
Create database connection with optional pooling.
When pool parameters are provided, returns PooledMySQLConnection.
Otherwise returns standard MySQLConnection.
Pool Parameters:
pool_name: Pool identifier
pool_size: Maximum connections in pool
pool_reset_session: Reset session variables on reuse
pool_timeout: Maximum wait time for available connection
Returns:
PooledMySQLConnection when pooling, MySQLConnection otherwise
"""
pass
def generate_pool_name(**kwargs) -> str:
"""
Generate pool name from connection parameters.
Creates unique pool name based on host, port, user, and database.
Returns:
Generated pool name string
"""
passpool_config = {
'pool_name': str, # Unique pool identifier
'pool_size': int, # Maximum connections (default: 5, max: 32)
'pool_reset_session': bool, # Reset session on reuse (default: True)
'pool_timeout': int, # Wait timeout in seconds (default: 0 = no timeout)
}All standard connection parameters can be used with pooling:
pooled_connection_config = {
# Pool-specific
'pool_name': 'myapp_pool',
'pool_size': 10,
'pool_reset_session': True,
'pool_timeout': 30,
# Connection parameters
'host': 'localhost',
'port': 3306,
'user': 'myuser',
'password': 'mypassword',
'database': 'mydatabase',
'charset': 'utf8mb4',
'autocommit': False,
# SSL parameters
'ssl_disabled': False,
'ssl_verify_cert': True,
# Performance parameters
'use_pure': False, # Use C extension if available
'buffered': True,
'compress': False,
}import mysql.connector
# Create connection pool
config = {
'host': 'localhost',
'user': 'myuser',
'password': 'mypassword',
'database': 'mydatabase',
'pool_name': 'myapp_pool',
'pool_size': 5
}
# Get pooled connection
connection = mysql.connector.connect(**config)
print(f"Connected via pool: {connection.pool_name}")
cursor = connection.cursor()
cursor.execute("SELECT COUNT(*) FROM users")
count = cursor.fetchone()[0]
print(f"User count: {count}")
cursor.close()
connection.close() # Returns connection to poolimport mysql.connector
# Create pool explicitly
pool_config = {
'host': 'localhost',
'user': 'myuser',
'password': 'mypassword',
'database': 'mydatabase'
}
pool = mysql.connector.pooling.MySQLConnectionPool(
pool_name='explicit_pool',
pool_size=8,
pool_reset_session=True,
**pool_config
)
# Get connections from pool
try:
connection1 = pool.get_connection()
connection2 = pool.get_connection()
# Use connections
cursor1 = connection1.cursor()
cursor1.execute("SELECT 'Connection 1' as source")
result1 = cursor1.fetchone()
print(result1)
cursor2 = connection2.cursor()
cursor2.execute("SELECT 'Connection 2' as source")
result2 = cursor2.fetchone()
print(result2)
finally:
# Return connections to pool
cursor1.close()
cursor2.close()
connection1.close()
connection2.close()
# Close entire pool
pool.close()import mysql.connector
config = {
'host': 'localhost',
'user': 'myuser',
'password': 'mypassword',
'database': 'mydatabase',
'pool_name': 'context_pool',
'pool_size': 3
}
# Automatic connection return to pool
with mysql.connector.connect(**config) as connection:
with connection.cursor(dictionary=True) as cursor:
cursor.execute("SELECT id, name FROM users LIMIT 5")
users = cursor.fetchall()
for user in users:
print(f"User {user['id']}: {user['name']}")
# Cursor automatically closed
# Connection automatically returned to poolimport mysql.connector
import threading
import time
# Shared pool configuration
pool_config = {
'host': 'localhost',
'user': 'myuser',
'password': 'mypassword',
'database': 'mydatabase',
'pool_name': 'worker_pool',
'pool_size': 5,
'pool_timeout': 10 # Wait up to 10 seconds for connection
}
def worker_task(worker_id: int):
"""Worker function that uses pooled connection."""
try:
# Get connection from shared pool
connection = mysql.connector.connect(**pool_config)
cursor = connection.cursor()
cursor.execute("SELECT SLEEP(%s)", (1,)) # Simulate work
cursor.fetchone()
print(f"Worker {worker_id} completed using pool connection")
cursor.close()
connection.close() # Return to pool
except mysql.connector.PoolError as err:
print(f"Worker {worker_id} failed to get connection: {err}")
# Create multiple worker threads
threads = []
for i in range(10): # More workers than pool size
thread = threading.Thread(target=worker_task, args=(i,))
threads.append(thread)
thread.start()
# Wait for all workers to complete
for thread in threads:
thread.join()
print("All workers completed")import mysql.connector
# Pool with failover servers
config = {
'user': 'myuser',
'password': 'mypassword',
'database': 'mydatabase',
'pool_name': 'failover_pool',
'pool_size': 5,
'failover': [
{'host': 'primary.mysql.example.com', 'port': 3306},
{'host': 'secondary.mysql.example.com', 'port': 3306},
{'host': 'tertiary.mysql.example.com', 'port': 3306}
]
}
try:
connection = mysql.connector.connect(**config)
print(f"Connected to MySQL via pool: {connection.pool_name}")
cursor = connection.cursor()
cursor.execute("SELECT @@hostname as server")
server = cursor.fetchone()[0]
print(f"Connected to server: {server}")
cursor.close()
connection.close()
except mysql.connector.Error as err:
print(f"Connection failed: {err}")import mysql.connector
import threading
# Create pool
pool = mysql.connector.pooling.MySQLConnectionPool(
pool_name='monitored_pool',
pool_size=3,
host='localhost',
user='myuser',
password='mypassword',
database='mydatabase'
)
def monitor_pool():
"""Monitor pool usage."""
while True:
# Note: These are conceptual - actual implementation may vary
print(f"Pool size: {pool.pool_size}")
print(f"Pool name: {pool.pool_name}")
time.sleep(5)
# Start monitoring thread
monitor_thread = threading.Thread(target=monitor_pool, daemon=True)
monitor_thread.start()
# Use pool connections
connections = []
try:
# Get multiple connections
for i in range(3):
conn = pool.get_connection()
connections.append(conn)
print(f"Got connection {i+1}")
# Try to get one more (should wait or fail based on timeout)
try:
extra_conn = pool.get_connection()
print("Got extra connection")
connections.append(extra_conn)
except mysql.connector.PoolError as err:
print(f"Pool exhausted: {err}")
finally:
# Return all connections
for conn in connections:
conn.close()
pool.close()import mysql.connector
# Advanced pool with session reset
config = {
'host': 'localhost',
'user': 'myuser',
'password': 'mypassword',
'database': 'mydatabase',
'pool_name': 'advanced_pool',
'pool_size': 10,
'pool_reset_session': True, # Reset session variables
'pool_timeout': 30, # Wait timeout
# Connection tuning
'connect_timeout': 10,
'read_timeout': 30,
'write_timeout': 30,
'charset': 'utf8mb4',
'collation': 'utf8mb4_unicode_ci',
'autocommit': False,
'buffered': True,
'use_pure': False, # Use C extension
# SSL configuration
'ssl_disabled': False,
'ssl_verify_cert': True,
'ssl_verify_identity': True,
}
# Use advanced pool
connection = mysql.connector.connect(**config)
# Connection has session reset to default state
cursor = connection.cursor()
cursor.execute("SELECT @@autocommit, @@sql_mode")
settings = cursor.fetchone()
print(f"Autocommit: {settings[0]}, SQL Mode: {settings[1]}")
cursor.close()
connection.close()Install with Tessl CLI
npx tessl i tessl/pypi-mysql-connector-python