A self-contained Python driver for communicating with MySQL servers, using an API that is compliant with the Python Database API Specification v2.0 (PEP 249).
—
Perform database operations asynchronously using asyncio with full async/await support, providing non-blocking database access for high-concurrency applications.
import mysql.connector.aio
async def connect(**kwargs) -> 'MySQLConnection':
"""
Create async connection to MySQL server.
Returns:
MySQLConnection instance for async operations
"""
passclass MySQLConnection:
"""
Async connection class with asyncio support.
Provides non-blocking database operations using async/await.
"""
def __init__(self, **kwargs) -> None:
"""Initialize async connection with configuration."""
pass
async def connect(self) -> None:
"""Establish async connection to MySQL server."""
pass
async def disconnect(self) -> None:
"""Close async connection to MySQL server."""
pass
async def close(self) -> None:
"""Close connection (alias for disconnect)."""
pass
def is_connected(self) -> bool:
"""Check if connection is active (non-blocking check)."""
pass
async def ping(self, reconnect: bool = False, attempts: int = 1, delay: int = 0) -> None:
"""Test connection to server asynchronously."""
pass
async def reconnect(self, attempts: int = 1, delay: int = 0) -> None:
"""Reconnect to MySQL server asynchronously."""
pass
def cursor(self,
buffered: Optional[bool] = None,
raw: Optional[bool] = None,
prepared: Optional[bool] = None,
cursor_class: Optional[Type] = None,
dictionary: Optional[bool] = None) -> 'MySQLCursor':
"""Create async cursor for executing SQL statements."""
pass
async def commit(self) -> None:
"""Commit current transaction asynchronously."""
pass
async def rollback(self) -> None:
"""Rollback current transaction asynchronously."""
pass
async def start_transaction(self,
consistent_snapshot: bool = False,
isolation_level: Optional[str] = None,
readonly: Optional[bool] = None) -> None:
"""Start new transaction asynchronously."""
pass
@property
def autocommit(self) -> bool:
"""Get autocommit mode status."""
pass
async def set_autocommit(self, value: bool) -> None:
"""Set autocommit mode asynchronously."""
pass
@property
def database(self) -> str:
"""Get current database name."""
pass
async def set_database(self, value: str) -> None:
"""Change current database asynchronously."""
pass
@property
def server_version(self) -> Tuple[int, int, int]:
"""Get MySQL server version tuple."""
pass
@property
def connection_id(self) -> int:
"""Get MySQL connection ID."""
pass
@property
def charset(self) -> str:
"""Get connection character set."""
pass
async def set_charset(self, value: str) -> None:
"""Set connection character set asynchronously."""
pass
async def cmd_query(self, query: Union[str, bytes]) -> Dict:
"""Execute query asynchronously and return raw result."""
pass
async def cmd_quit(self) -> bytes:
"""Send quit command to server asynchronously."""
pass
async def cmd_init_db(self, database: str) -> bytes:
"""Send init_db command to change database asynchronously."""
pass
async def cmd_refresh(self, options: int) -> bytes:
"""Send refresh command asynchronously."""
pass
async def cmd_statistics(self) -> Dict:
"""Get server statistics asynchronously."""
pass
async def cmd_ping(self) -> bytes:
"""Send ping command to server asynchronously."""
pass
async def reset_session(self,
user_variables: Optional[Dict] = None,
session_variables: Optional[Dict] = None) -> None:
"""Reset session to initial state asynchronously."""
pass
async def get_warnings(self, count: Optional[int] = None) -> List[Tuple]:
"""Get warning messages from last statement asynchronously."""
pass
@property
def warning_count(self) -> int:
"""Get warning count from last statement."""
pass
@property
def info_msg(self) -> Optional[str]:
"""Get info message from last statement."""
pass
@property
def insert_id(self) -> int:
"""Get auto-generated ID from last INSERT."""
pass
@property
def affected_rows(self) -> int:
"""Get affected row count from last statement."""
pass
@property
def in_transaction(self) -> bool:
"""Check if connection is in transaction."""
pass
async def __aenter__(self) -> 'MySQLConnection':
"""Async context manager entry."""
pass
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Async context manager exit with automatic cleanup."""
passclass MySQLConnectionAbstract:
"""
Abstract base class for async connections.
Defines interface for async connection implementations.
"""
passclass MySQLCursor:
"""
Async cursor for executing SQL statements.
Provides non-blocking query execution and result fetching.
"""
async def execute(self, operation: str, params: Optional[Union[Sequence, Dict]] = None, multi: bool = False) -> Optional[AsyncIterator]:
"""Execute SQL statement asynchronously with optional parameters."""
pass
async def executemany(self, operation: str, seq_params: Sequence[Union[Sequence, Dict]]) -> None:
"""Execute SQL statement multiple times asynchronously."""
pass
async def fetchone(self) -> Optional[Tuple]:
"""Fetch next row from result set asynchronously."""
pass
async def fetchmany(self, size: Optional[int] = None) -> List[Tuple]:
"""Fetch specified number of rows asynchronously."""
pass
async def fetchall(self) -> List[Tuple]:
"""Fetch all remaining rows asynchronously."""
pass
async def close(self) -> None:
"""Close cursor and free resources asynchronously."""
pass
async def callproc(self, procname: str, args: Sequence = ()) -> Optional[Dict]:
"""Call stored procedure asynchronously."""
pass
def stored_results(self) -> AsyncIterator['MySQLCursor']:
"""Return async iterator for stored procedure result sets."""
pass
async def nextset(self) -> Optional[bool]:
"""Skip to next result set asynchronously."""
pass
@property
def description(self) -> Optional[List[Tuple]]:
"""Column metadata for last executed query."""
pass
@property
def rowcount(self) -> int:
"""Number of rows affected by last operation."""
pass
@property
def lastrowid(self) -> Optional[int]:
"""Auto-generated ID from last INSERT operation."""
pass
@property
def arraysize(self) -> int:
"""Default number of rows fetchmany() should return."""
pass
@arraysize.setter
def arraysize(self, value: int) -> None:
"""Set default fetchmany() size."""
pass
@property
def statement(self) -> Optional[str]:
"""Last executed SQL statement."""
pass
@property
def with_rows(self) -> bool:
"""Whether last operation produced result rows."""
pass
@property
def column_names(self) -> Tuple[str, ...]:
"""Column names from result set."""
pass
def __aiter__(self) -> 'MySQLCursor':
"""Make cursor async iterable over result rows."""
pass
async def __anext__(self) -> Tuple:
"""Get next row for async iteration."""
pass
async def __aenter__(self) -> 'MySQLCursor':
"""Async context manager entry."""
pass
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Async context manager exit with automatic cleanup."""
passclass MySQLConnectionPool:
"""
Async connection pool manager.
Manages pool of async database connections for efficient reuse.
"""
def __init__(self,
pool_name: Optional[str] = None,
pool_size: int = 5,
pool_reset_session: bool = True,
**kwargs) -> None:
"""Initialize async connection pool."""
pass
async def get_connection(self) -> 'PooledMySQLConnection':
"""Get connection from pool asynchronously."""
pass
async def add_connection(self, cnx: Optional['MySQLConnection'] = None) -> 'PooledMySQLConnection':
"""Add connection to pool asynchronously."""
pass
def set_config(self, **kwargs) -> None:
"""Update pool configuration."""
pass
async def close(self) -> None:
"""Close all connections in pool asynchronously."""
pass
@property
def pool_name(self) -> str:
"""Pool name identifier."""
pass
@property
def pool_size(self) -> int:
"""Maximum pool size."""
passclass PooledMySQLConnection:
"""
Async pooled connection wrapper.
Returns connection to pool on close in async context.
"""
def __init__(self, pool: MySQLConnectionPool, cnx: 'MySQLConnection') -> None:
"""Initialize async pooled connection wrapper."""
pass
async def close(self) -> None:
"""Return connection to pool asynchronously."""
pass
@property
def pool_name(self) -> str:
"""Name of the connection pool."""
pass
async def __aenter__(self) -> 'PooledMySQLConnection':
"""Async context manager entry."""
pass
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Async context manager exit returning connection to pool."""
passasync def connect(**kwargs) -> Union[MySQLConnection, PooledMySQLConnection]:
"""
Create async database connection with optional pooling.
When pool parameters are provided, returns PooledMySQLConnection.
Otherwise returns MySQLConnection.
"""
passimport asyncio
import mysql.connector.aio
async def main():
# Create async connection
connection = await mysql.connector.aio.connect(
host='localhost',
user='myuser',
password='mypassword',
database='mydatabase'
)
# Create async cursor
cursor = connection.cursor()
# Execute query asynchronously
await cursor.execute("SELECT id, name FROM users WHERE age > %s", (25,))
# Fetch results asynchronously
async for (user_id, name) in cursor:
print(f"User {user_id}: {name}")
# Cleanup
await cursor.close()
await connection.close()
# Run async function
asyncio.run(main())import asyncio
import mysql.connector.aio
async def main():
# Automatic async connection cleanup
async with mysql.connector.aio.connect(
host='localhost',
user='myuser',
password='mypassword',
database='mydatabase'
) as connection:
# Automatic async cursor cleanup
async with connection.cursor(dictionary=True) as cursor:
await cursor.execute("SELECT COUNT(*) as total FROM users")
result = await cursor.fetchone()
print(f"Total users: {result['total']}")
# Cursor automatically closed
# Connection automatically closed
asyncio.run(main())import asyncio
import mysql.connector.aio
async def transfer_funds(from_account: int, to_account: int, amount: float):
async with mysql.connector.aio.connect(
host='localhost',
user='myuser',
password='mypassword',
database='mydatabase'
) as connection:
try:
# Start transaction
await connection.start_transaction()
async with connection.cursor() as cursor:
# Debit from account
await cursor.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_account)
)
# Credit to account
await cursor.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_account)
)
# Check if both operations affected rows
if cursor.rowcount == 0:
raise ValueError("Account not found")
# Commit transaction
await connection.commit()
print(f"Transferred {amount} from {from_account} to {to_account}")
except Exception as err:
# Rollback on error
await connection.rollback()
print(f"Transfer failed: {err}")
raise
asyncio.run(transfer_funds(1, 2, 100.0))import asyncio
import mysql.connector.aio
async def worker_task(worker_id: int):
"""Async worker using pooled connection."""
# Get connection from async pool
async with mysql.connector.aio.connect(
host='localhost',
user='myuser',
password='mypassword',
database='mydatabase',
pool_name='async_pool',
pool_size=5
) as connection:
async with connection.cursor() as cursor:
await cursor.execute("SELECT SLEEP(%s)", (1,))
await cursor.fetchone()
print(f"Async worker {worker_id} completed")
async def main():
# Create multiple async tasks
tasks = [worker_task(i) for i in range(10)]
# Run tasks concurrently
await asyncio.gather(*tasks)
print("All async workers completed")
asyncio.run(main())import asyncio
import mysql.connector.aio
async def process_batch(batch_data: List[Dict]):
"""Process batch of data asynchronously."""
async with mysql.connector.aio.connect(
host='localhost',
user='myuser',
password='mypassword',
database='mydatabase'
) as connection:
async with connection.cursor() as cursor:
# Prepare batch insert
insert_query = "INSERT INTO processed_data (id, value, timestamp) VALUES (%s, %s, NOW())"
# Execute batch asynchronously
for item in batch_data:
await cursor.execute(insert_query, (item['id'], item['value']))
await connection.commit()
print(f"Processed batch of {len(batch_data)} items")
async def main():
# Sample data batches
batches = [
[{'id': 1, 'value': 'A'}, {'id': 2, 'value': 'B'}],
[{'id': 3, 'value': 'C'}, {'id': 4, 'value': 'D'}],
[{'id': 5, 'value': 'E'}, {'id': 6, 'value': 'F'}]
]
# Process all batches concurrently
tasks = [process_batch(batch) for batch in batches]
await asyncio.gather(*tasks)
print("All batches processed")
asyncio.run(main())import asyncio
import mysql.connector.aio
async def stream_large_dataset():
"""Stream large dataset asynchronously."""
async with mysql.connector.aio.connect(
host='localhost',
user='myuser',
password='mypassword',
database='mydatabase'
) as connection:
async with connection.cursor() as cursor:
await cursor.execute("SELECT * FROM large_table ORDER BY id")
# Stream results asynchronously
count = 0
async for row in cursor:
# Process each row as it arrives
print(f"Processing row {count}: {row[0]}")
count += 1
# Yield control to other tasks periodically
if count % 1000 == 0:
await asyncio.sleep(0) # Yield control
print(f"Streamed {count} rows")
asyncio.run(stream_large_dataset())import asyncio
import mysql.connector.aio
async def sync_data_between_databases():
"""Sync data between two databases asynchronously."""
# Connect to source database
source_conn = await mysql.connector.aio.connect(
host='source.mysql.example.com',
user='myuser',
password='mypassword',
database='source_db'
)
# Connect to destination database
dest_conn = await mysql.connector.aio.connect(
host='dest.mysql.example.com',
user='myuser',
password='mypassword',
database='dest_db'
)
try:
# Get cursors for both connections
source_cursor = source_conn.cursor(dictionary=True)
dest_cursor = dest_conn.cursor()
# Read from source
await source_cursor.execute("SELECT * FROM users WHERE updated_at > %s", ('2024-01-01',))
# Process and insert to destination
async for user in source_cursor:
await dest_cursor.execute(
"INSERT INTO users (id, name, email) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE name=%s, email=%s",
(user['id'], user['name'], user['email'], user['name'], user['email'])
)
# Commit destination changes
await dest_conn.commit()
print("Data sync completed")
finally:
# Cleanup both connections
await source_cursor.close()
await dest_cursor.close()
await source_conn.close()
await dest_conn.close()
asyncio.run(sync_data_between_databases())import asyncio
import mysql.connector.aio
async def query_with_timeout():
"""Execute query with timeout and error handling."""
try:
# Set timeout for entire operation
async with asyncio.timeout(30): # 30 second timeout
async with mysql.connector.aio.connect(
host='localhost',
user='myuser',
password='mypassword',
database='mydatabase',
connect_timeout=10 # Connection timeout
) as connection:
async with connection.cursor() as cursor:
# Long-running query
await cursor.execute("SELECT * FROM large_table WHERE complex_condition = %s", ('value',))
results = await cursor.fetchall()
print(f"Query returned {len(results)} rows")
except asyncio.TimeoutError:
print("Query timed out")
except mysql.connector.Error as err:
print(f"Database error: {err}")
except Exception as err:
print(f"Unexpected error: {err}")
asyncio.run(query_with_timeout())Install with Tessl CLI
npx tessl i tessl/pypi-mysql-connector-python