Python MySQL Driver using Cython for high-performance database connectivity with async support
—
Asyncio-compatible database operations including async connections, cursors, and connection pooling for high-performance concurrent database access.
Create asynchronous database connections that integrate with Python's asyncio event loop for non-blocking database operations.
async def connect(host="localhost", user=None, passwd="", db=None, port=3306,
unix_socket=None, charset='', sql_mode=None, read_default_file=None,
client_flag=0, cursorclass=None, init_command=None, connect_timeout=None,
ssl=None, read_default_group=None, compress="", zstd_compression_level=3,
named_pipe=None, conv=None, encoders=None, loop=None):
"""
Create an asynchronous database connection.
Parameters:
- host (str): MySQL server hostname or IP address (default: "localhost")
- user (str): Username for authentication
- passwd (str): Password for authentication (default: "")
- port (int): MySQL server port number (default: 3306)
- db (str): Default database name
- unix_socket (str): Unix socket path for local connections
- charset (str): Character set for connection (default: '')
- sql_mode (str): SQL mode setting for connection
- read_default_file (str): MySQL configuration file path
- client_flag (int): Custom flags to send to MySQL
- cursorclass: Default cursor class for this connection
- init_command (str): SQL command to run on connection
- connect_timeout (int): Connection timeout in seconds
- ssl (dict): SSL configuration parameters
- read_default_group (str): Group to read from configuration file
- compress (str): Compression algorithm ("zlib" or "zstd")
- zstd_compression_level (int): ZSTD compression level (1-22, default: 3)
- named_pipe: Not supported (raises NotImplementedError)
- conv: Decoders dictionary for custom type marshalling
- encoders: Encoders dictionary for custom type marshalling
- loop: Event loop to use (default: current event loop)
Returns:
AsyncConnection: Asynchronous database connection object
Raises:
OperationalError: Connection failed
InterfaceError: Invalid connection parameters
"""The AsyncConnection class provides asynchronous methods for managing database connections and executing operations without blocking the event loop.
class AsyncConnection:
async def cursor(self, cursor=None):
"""
Create a new async cursor object.
Parameters:
- cursor: Async cursor class to instantiate (optional)
Returns:
AsyncCursor: New async cursor object
"""
async def commit(self):
"""
Commit current transaction asynchronously.
Raises:
OperationalError: Transaction commit failed
"""
async def rollback(self):
"""
Roll back current transaction asynchronously.
Raises:
OperationalError: Transaction rollback failed
"""
def close(self):
"""
Close the database connection.
"""
async def ensure_closed(self):
"""
Ensure connection is properly closed and cleaned up.
"""
async def autocommit(self, value):
"""
Enable or disable autocommit mode asynchronously.
Parameters:
- value (bool): True to enable autocommit, False to disable
"""
async def ping(self):
"""
Check if connection to server is alive asynchronously.
Raises:
OperationalError: Connection check failed
"""
async def set_charset(self, charset):
"""
Set connection character set asynchronously.
Parameters:
- charset (str): Character set name
"""Asynchronous cursor providing non-blocking SQL execution and result retrieval.
class AsyncCursor:
async def execute(self, query, args=None):
"""
Execute a SQL statement asynchronously.
Parameters:
- query (str): SQL statement with optional %s placeholders
- args (tuple/list/dict): Parameters to bind to placeholders
Returns:
int: Number of affected rows
Raises:
ProgrammingError: SQL syntax error
OperationalError: Database operation error
"""
async def executemany(self, query, args_list):
"""
Execute a SQL statement multiple times asynchronously.
Parameters:
- query (str): SQL statement with %s placeholders
- args_list (list/tuple): Sequence of parameter tuples/lists
Returns:
int: Number of affected rows from all executions
"""
async def fetchone(self):
"""
Fetch the next row asynchronously.
Returns:
tuple: Next row data, or None if no more rows
"""
async def fetchmany(self, size=None):
"""
Fetch multiple rows asynchronously.
Parameters:
- size (int): Number of rows to fetch (default: arraysize)
Returns:
list: List of row tuples
"""
async def fetchall(self):
"""
Fetch all remaining rows asynchronously.
Returns:
list: List of all remaining row tuples
"""
def close(self):
"""
Close the cursor and free resources.
"""
async def __aenter__(self):
"""Async context manager entry."""
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""Async cursor that returns rows as dictionaries instead of tuples.
class AsyncDictCursor(AsyncCursor):
"""
Async cursor that returns rows as dictionaries with column names as keys.
Each row is returned as a dictionary mapping column names to values,
making it easier to access specific columns by name in async operations.
"""
async def execute(self, query, args=None):
"""Execute query and prepare field mapping for dictionary results."""
async def fetchone(self):
"""
Fetch the next row as a dictionary asynchronously.
Returns:
dict: Row data as {column_name: value} mapping, or None if no more rows
"""
async def fetchmany(self, size=None):
"""
Fetch multiple rows as dictionaries asynchronously.
Parameters:
- size (int): Number of rows to fetch (default: arraysize)
Returns:
tuple: Tuple of dictionaries representing rows
"""
async def fetchall(self):
"""
Fetch all remaining rows as dictionaries asynchronously.
Returns:
tuple: Tuple of dictionaries representing all remaining rows
"""Efficient connection pooling for high-throughput async applications with automatic connection lifecycle management.
def create_pool(minsize=1, maxsize=10, pool_recycle=-1, loop=None, **kwargs):
"""
Create an async connection pool context manager.
Parameters:
- minsize (int): Minimum number of connections in pool
- maxsize (int): Maximum number of connections in pool
- pool_recycle (int): Connection recycle time in seconds (-1 = no recycle)
- loop: Event loop to use (default: current event loop)
- **kwargs: Connection parameters for pool connections
Returns:
PoolContextManager: Context manager for connection pool
"""
async def _create_pool(minsize=1, maxsize=10, pool_recycle=-1, loop=None, **kwargs):
"""
Create an async connection pool directly.
Returns:
Pool: Connection pool object
"""
class Pool:
"""
Async connection pool managing multiple database connections.
"""
def acquire(self):
"""
Get a connection from the pool.
Returns:
PoolAcquireContextManager: Context manager for pool connection
"""
async def _acquire(self):
"""
Acquire a connection from the pool directly.
Returns:
AsyncConnection: Database connection from pool
"""
def release(self, conn):
"""
Return a connection to the pool.
Parameters:
- conn: Connection to return to pool
"""
def close(self):
"""
Close the pool and all connections.
"""
async def wait_closed(self):
"""
Wait for pool closure to complete.
"""
@property
def size(self):
"""Current number of connections in pool."""
@property
def freesize(self):
"""Number of free connections in pool."""import asyncio
import cymysql.aio
async def basic_query():
conn = await cymysql.aio.connect(
host='localhost',
user='root',
password='password',
db='testdb'
)
cursor = await conn.cursor()
await cursor.execute("SELECT COUNT(*) FROM users")
result = await cursor.fetchone()
print(f"User count: {result[0]}")
cursor.close()
conn.close()
asyncio.run(basic_query())import asyncio
import cymysql.aio
async def context_manager_example():
conn = await cymysql.aio.connect(
host='localhost',
user='root',
password='password',
db='testdb'
)
# Cursor context manager
async with conn.cursor() as cursor:
await cursor.execute("SELECT id, name FROM users WHERE active = %s", (True,))
async for row in cursor:
print(f"User: {row[1]} (ID: {row[0]})")
conn.close()
asyncio.run(context_manager_example())import asyncio
import cymysql.aio
async def pool_example():
# Create connection pool
pool = await cymysql.aio.create_pool(
host='localhost',
user='root',
password='password',
db='testdb',
minsize=1,
maxsize=10
)
# Use pool connection
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT VERSION()")
version = await cursor.fetchone()
print(f"MySQL version: {version[0]}")
# Clean up pool
pool.close()
await pool.wait_closed()
asyncio.run(pool_example())import asyncio
import cymysql.aio
async def fetch_user_data(pool, user_id):
async with pool.acquire() as conn:
async with conn.cursor(cymysql.aio.AsyncDictCursor) as cursor:
await cursor.execute(
"SELECT id, name, email FROM users WHERE id = %s",
(user_id,)
)
return await cursor.fetchone()
async def concurrent_example():
pool = await cymysql.aio.create_pool(
host='localhost',
user='root',
password='password',
db='testdb',
minsize=5,
maxsize=20
)
# Fetch multiple users concurrently
user_ids = [1, 2, 3, 4, 5]
tasks = [fetch_user_data(pool, uid) for uid in user_ids]
users = await asyncio.gather(*tasks)
for user in users:
if user:
print(f"User: {user['name']} ({user['email']})")
pool.close()
await pool.wait_closed()
asyncio.run(concurrent_example())import asyncio
import cymysql.aio
async def transfer_funds(pool, from_account, to_account, amount):
async with pool.acquire() as conn:
try:
# Disable autocommit for transaction
await conn.autocommit(False)
async with conn.cursor() as cursor:
# Debit from source account
await cursor.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_account)
)
# Credit to destination account
await cursor.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_account)
)
# Commit transaction
await conn.commit()
print(f"Transfer of ${amount} completed successfully")
except Exception as e:
# Rollback on error
await conn.rollback()
print(f"Transfer failed: {e}")
finally:
# Re-enable autocommit
await conn.autocommit(True)
async def transaction_example():
pool = await cymysql.aio.create_pool(
host='localhost',
user='root',
password='password',
db='banking'
)
await transfer_funds(pool, from_account=1, to_account=2, amount=100.00)
pool.close()
await pool.wait_closed()
asyncio.run(transaction_example())import asyncio
import cymysql.aio
async def process_large_dataset(pool):
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT * FROM large_table")
# Process results in batches to avoid memory issues
while True:
rows = await cursor.fetchmany(1000)
if not rows:
break
# Process batch
for row in rows:
await process_row_async(row)
print(f"Processed batch of {len(rows)} rows")
async def process_row_async(row):
# Simulate async processing
await asyncio.sleep(0.001)
asyncio.run(process_large_dataset(pool))import asyncio
import cymysql.aio
from cymysql import OperationalError, ProgrammingError
async def robust_async_operation():
pool = None
try:
pool = await cymysql.aio.create_pool(
host='localhost',
user='root',
password='password',
db='testdb',
minsize=1,
maxsize=5
)
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT COUNT(*) FROM nonexistent_table")
result = await cursor.fetchone()
except OperationalError as e:
print(f"Database operation error: {e}")
except ProgrammingError as e:
print(f"SQL programming error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
if pool:
pool.close()
await pool.wait_closed()
asyncio.run(robust_async_operation())minsize and maxsize for your application's concurrency needsfetchmany() for large result sets to control memory usagepool_recycle parameter for long-running applicationsInstall with Tessl CLI
npx tessl i tessl/pypi-cymysql