MySQL driver for asyncio providing async/await support for database operations.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
High-level database operations using SQLAlchemy's expression language and ORM capabilities with async/await support. The aiomysql.sa subpackage provides SQLAlchemy-compatible interfaces for advanced database operations.
import aiomysql.sa
from aiomysql.sa import create_engine, SAConnection, EngineCreate a SQLAlchemy-compatible async engine with connection pooling.
async def create_engine(
minsize: int = 1,
maxsize: int = 10,
loop = None,
dialect = None,
pool_recycle: int = -1,
compiled_cache = None,
**kwargs
) -> Engine:
"""
Create SQLAlchemy-compatible async engine.
Parameters:
- minsize: Minimum number of connections in pool
- maxsize: Maximum number of connections in pool
- loop: Event loop to use
- dialect: SQLAlchemy dialect instance
- pool_recycle: Seconds after which to recreate connections
- compiled_cache: Compiled query cache
- **kwargs: Connection parameters (same as connect())
Returns:
Engine context manager
"""The Engine class manages connection pools and provides SQLAlchemy-style database access.
class Engine:
@property
def dialect(self):
"""SQLAlchemy dialect instance."""
@property
def name(self) -> str:
"""Engine name."""
@property
def driver(self) -> str:
"""Database driver name."""
@property
def minsize(self) -> int:
"""Minimum pool size."""
@property
def maxsize(self) -> int:
"""Maximum pool size."""
@property
def size(self) -> int:
"""Current pool size."""
@property
def freesize(self) -> int:
"""Number of free connections."""
def acquire(self) -> SAConnection:
"""
Acquire a connection from the engine pool.
Returns:
SAConnection context manager
"""
def release(self, conn: SAConnection) -> None:
"""
Return a connection to the pool.
Parameters:
- conn: Connection to release
"""
def close(self) -> None:
"""Close the engine and mark connections for closure."""
def terminate(self) -> None:
"""Terminate engine immediately, closing all connections."""
async def wait_closed(self) -> None:
"""Wait for engine to be completely closed."""SQLAlchemy-style connection wrapper providing high-level database operations.
class SAConnection:
@property
def closed(self) -> bool:
"""Whether the connection is closed."""
@property
def connection(self) -> Connection:
"""Underlying aiomysql connection."""
@property
def in_transaction(self) -> bool:
"""Whether connection is in a transaction."""
def execute(self, query, *multiparams, **params) -> ResultProxy:
"""
Execute a SQLAlchemy query.
Parameters:
- query: SQLAlchemy selectable or text query
- multiparams: Multiple parameter sets for executemany-style
- params: Single parameter set
Returns:
ResultProxy for accessing results
"""
async def scalar(self, query, *multiparams, **params):
"""
Execute query and return scalar result.
Parameters:
- query: SQLAlchemy selectable or text query
- multiparams: Multiple parameter sets
- params: Single parameter set
Returns:
Single scalar value
"""
def begin(self) -> RootTransaction:
"""
Begin a transaction.
Returns:
Transaction context manager
"""
async def begin_nested(self) -> NestedTransaction:
"""
Begin a nested transaction (savepoint).
Returns:
Nested transaction instance
"""
async def begin_twophase(self, xid = None) -> TwoPhaseTransaction:
"""
Begin a two-phase transaction.
Parameters:
- xid: Transaction ID (generated if None)
Returns:
Two-phase transaction instance
"""
async def recover_twophase(self) -> list:
"""
Get list of prepared transaction IDs.
Returns:
List of transaction IDs ready for commit/rollback
"""
async def rollback_prepared(self, xid, *, is_prepared: bool = True) -> None:
"""
Rollback a prepared two-phase transaction.
Parameters:
- xid: Transaction ID to rollback
- is_prepared: Whether transaction is already prepared
"""
async def commit_prepared(self, xid, *, is_prepared: bool = True) -> None:
"""
Commit a prepared two-phase transaction.
Parameters:
- xid: Transaction ID to commit
- is_prepared: Whether transaction is already prepared
"""
async def close(self) -> None:
"""Close the connection."""Transaction management classes for various transaction types.
class Transaction:
@property
def is_active(self) -> bool:
"""Whether transaction is active."""
@property
def connection(self) -> SAConnection:
"""Connection associated with transaction."""
async def commit(self) -> None:
"""Commit the transaction."""
async def rollback(self) -> None:
"""Rollback the transaction."""
async def close(self) -> None:
"""Close the transaction."""
class RootTransaction(Transaction):
"""Root-level database transaction."""
class NestedTransaction(Transaction):
"""Nested transaction using savepoints."""
class TwoPhaseTransaction(Transaction):
"""Two-phase (XA) transaction."""
@property
def xid(self) -> str:
"""Transaction ID."""
async def prepare(self) -> None:
"""Prepare transaction for commit."""Classes for handling query results in SQLAlchemy style.
class ResultProxy:
@property
def dialect(self):
"""SQLAlchemy dialect."""
@property
def cursor(self) -> Cursor:
"""Underlying cursor."""
@property
def rowcount(self) -> int:
"""Number of affected rows."""
@property
def lastrowid(self) -> int:
"""Last inserted row ID."""
@property
def returns_rows(self) -> bool:
"""Whether query returns rows."""
@property
def closed(self) -> bool:
"""Whether result is closed."""
async def fetchone(self) -> RowProxy:
"""
Fetch next row.
Returns:
RowProxy instance or None
"""
async def fetchall(self) -> list:
"""
Fetch all remaining rows.
Returns:
List of RowProxy instances
"""
async def fetchmany(self, size: int = None) -> list:
"""
Fetch multiple rows.
Parameters:
- size: Number of rows to fetch
Returns:
List of RowProxy instances
"""
async def first(self) -> RowProxy:
"""
Fetch first row and close result.
Returns:
First RowProxy or None
"""
async def scalar(self):
"""
Fetch scalar value from first row.
Returns:
Single value from first column of first row
"""
def keys(self) -> list:
"""
Get column names.
Returns:
List of column names
"""
async def close(self) -> None:
"""Close the result."""
class RowProxy:
"""Single result row with dict-like and sequence-like access."""
def as_tuple(self) -> tuple:
"""
Convert row to tuple.
Returns:
Row data as tuple
"""
def __getitem__(self, key):
"""
Access column by name or index.
Parameters:
- key: Column name (str) or index (int)
Returns:
Column value
"""
def __getattr__(self, name):
"""
Access column as attribute.
Parameters:
- name: Column name
Returns:
Column value
"""
def __contains__(self, key) -> bool:
"""
Check if column exists.
Parameters:
- key: Column name
Returns:
True if column exists
"""SQLAlchemy integration specific exceptions.
class Error(Exception):
"""Base SQLAlchemy integration error."""
class ArgumentError(Error):
"""Invalid or conflicting function arguments."""
class InvalidRequestError(Error):
"""Invalid operations or runtime state errors."""
class NoSuchColumnError(Error):
"""Nonexistent column requested from RowProxy."""
class ResourceClosedError(Error):
"""Operation on closed connection/cursor/result."""import asyncio
import sqlalchemy as sa
import aiomysql.sa
async def sqlalchemy_basic_example():
# Create engine
engine = await aiomysql.sa.create_engine(
host='localhost',
port=3306,
user='myuser',
password='mypass',
db='mydatabase',
minsize=1,
maxsize=5
)
# Define table metadata
metadata = sa.MetaData()
users = sa.Table('users', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('name', sa.String(50)),
sa.Column('email', sa.String(100)))
# Execute queries
async with engine.acquire() as conn:
# Select query
query = sa.select([users]).where(users.c.id < 10)
result = await conn.execute(query)
async for row in result:
print(f"User: {row.name} ({row.email})")
await result.close()
# Cleanup
engine.close()
await engine.wait_closed()
asyncio.run(sqlalchemy_basic_example())async def transaction_example():
engine = await aiomysql.sa.create_engine(
host='localhost',
user='myuser',
password='mypass',
db='mydatabase'
)
metadata = sa.MetaData()
users = sa.Table('users', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('name', sa.String(50)),
sa.Column('balance', sa.Numeric(10, 2)))
async with engine.acquire() as conn:
# Begin transaction
async with conn.begin() as trans:
try:
# Transfer money between users
await conn.execute(
users.update().where(users.c.id == 1).values(
balance=users.c.balance - 100
)
)
await conn.execute(
users.update().where(users.c.id == 2).values(
balance=users.c.balance + 100
)
)
# Transaction commits automatically on success
print("Transfer completed successfully")
except Exception as e:
# Transaction rolls back automatically on error
print(f"Transfer failed: {e}")
raise
engine.close()
await engine.wait_closed()async def nested_transaction_example():
engine = await aiomysql.sa.create_engine(
host='localhost',
user='myuser',
password='mypass',
db='mydatabase'
)
metadata = sa.MetaData()
orders = sa.Table('orders', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('user_id', sa.Integer),
sa.Column('total', sa.Numeric(10, 2)))
async with engine.acquire() as conn:
async with conn.begin() as trans:
# Main transaction: create order
result = await conn.execute(
orders.insert().values(user_id=123, total=250.00)
)
order_id = result.lastrowid
# Nested transaction: try to apply discount
try:
async with conn.begin_nested() as nested_trans:
# Try to apply discount
await conn.execute(
orders.update().where(orders.c.id == order_id).values(
total=orders.c.total * 0.9 # 10% discount
)
)
# Simulate discount validation failure
if order_id % 2 == 0: # Even order IDs fail
raise ValueError("Discount not applicable")
print("Discount applied successfully")
except ValueError:
# Nested transaction rolls back, main continues
print("Discount failed, order created without discount")
print(f"Order {order_id} completed")
engine.close()
await engine.wait_closed()async def complex_query_example():
engine = await aiomysql.sa.create_engine(
host='localhost',
user='myuser',
password='mypass',
db='mydatabase'
)
metadata = sa.MetaData()
users = sa.Table('users', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('name', sa.String(50)),
sa.Column('department_id', sa.Integer))
departments = sa.Table('departments', metadata,
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('name', sa.String(50)))
async with engine.acquire() as conn:
# Complex join query
query = sa.select([
users.c.name.label('user_name'),
departments.c.name.label('dept_name')
]).select_from(
users.join(departments, users.c.department_id == departments.c.id)
).where(
departments.c.name.in_(['Engineering', 'Marketing'])
).order_by(users.c.name)
result = await conn.execute(query)
print("Users in Engineering and Marketing:")
rows = await result.fetchall()
for row in rows:
print(f"{row.user_name} - {row.dept_name}")
await result.close()
engine.close()
await engine.wait_closed()async def raw_sql_example():
engine = await aiomysql.sa.create_engine(
host='localhost',
user='myuser',
password='mypass',
db='mydatabase'
)
async with engine.acquire() as conn:
# Raw SQL with parameters
query = sa.text("""
SELECT u.name, COUNT(o.id) as order_count
FROM users u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.created_at >= :start_date
GROUP BY u.id, u.name
HAVING COUNT(o.id) >= :min_orders
ORDER BY order_count DESC
""")
result = await conn.execute(query, {
'start_date': '2023-01-01',
'min_orders': 5
})
print("Top customers by order count:")
async for row in result:
print(f"{row.name}: {row.order_count} orders")
await result.close()
engine.close()
await engine.wait_closed()Install with Tessl CLI
npx tessl i tessl/pypi-aiomysql