CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aiomysql

MySQL driver for asyncio providing async/await support for database operations.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

sqlalchemy.mddocs/

SQLAlchemy Integration

High-level database operations using SQLAlchemy's expression language and ORM capabilities with async/await support. The aiomysql.sa subpackage provides SQLAlchemy-compatible interfaces for advanced database operations.

Core Imports

import aiomysql.sa
from aiomysql.sa import create_engine, SAConnection, Engine

Capabilities

Engine Creation

Create a SQLAlchemy-compatible async engine with connection pooling.

async def create_engine(
    minsize: int = 1,
    maxsize: int = 10,
    loop = None,
    dialect = None,
    pool_recycle: int = -1,
    compiled_cache = None,
    **kwargs
) -> Engine:
    """
    Create SQLAlchemy-compatible async engine.

    Parameters:
    - minsize: Minimum number of connections in pool
    - maxsize: Maximum number of connections in pool
    - loop: Event loop to use
    - dialect: SQLAlchemy dialect instance
    - pool_recycle: Seconds after which to recreate connections
    - compiled_cache: Compiled query cache
    - **kwargs: Connection parameters (same as connect())

    Returns:
    Engine context manager
    """

Engine Management

The Engine class manages connection pools and provides SQLAlchemy-style database access.

class Engine:
    @property
    def dialect(self):
        """SQLAlchemy dialect instance."""
    
    @property
    def name(self) -> str:
        """Engine name."""
    
    @property
    def driver(self) -> str:
        """Database driver name."""
    
    @property
    def minsize(self) -> int:
        """Minimum pool size."""
    
    @property
    def maxsize(self) -> int:
        """Maximum pool size."""
    
    @property
    def size(self) -> int:
        """Current pool size."""
    
    @property
    def freesize(self) -> int:
        """Number of free connections."""
    
    def acquire(self) -> SAConnection:
        """
        Acquire a connection from the engine pool.
        
        Returns:
        SAConnection context manager
        """
    
    def release(self, conn: SAConnection) -> None:
        """
        Return a connection to the pool.
        
        Parameters:
        - conn: Connection to release
        """
    
    def close(self) -> None:
        """Close the engine and mark connections for closure."""
    
    def terminate(self) -> None:
        """Terminate engine immediately, closing all connections."""
    
    async def wait_closed(self) -> None:
        """Wait for engine to be completely closed."""

SQLAlchemy Connection

SQLAlchemy-style connection wrapper providing high-level database operations.

class SAConnection:
    @property
    def closed(self) -> bool:
        """Whether the connection is closed."""
    
    @property
    def connection(self) -> Connection:
        """Underlying aiomysql connection."""
    
    @property
    def in_transaction(self) -> bool:
        """Whether connection is in a transaction."""
    
    def execute(self, query, *multiparams, **params) -> ResultProxy:
        """
        Execute a SQLAlchemy query.
        
        Parameters:
        - query: SQLAlchemy selectable or text query
        - multiparams: Multiple parameter sets for executemany-style
        - params: Single parameter set
        
        Returns:
        ResultProxy for accessing results
        """
    
    async def scalar(self, query, *multiparams, **params):
        """
        Execute query and return scalar result.
        
        Parameters:
        - query: SQLAlchemy selectable or text query
        - multiparams: Multiple parameter sets
        - params: Single parameter set
        
        Returns:
        Single scalar value
        """
    
    def begin(self) -> RootTransaction:
        """
        Begin a transaction.
        
        Returns:
        Transaction context manager
        """
    
    async def begin_nested(self) -> NestedTransaction:
        """
        Begin a nested transaction (savepoint).
        
        Returns:
        Nested transaction instance
        """
    
    async def begin_twophase(self, xid = None) -> TwoPhaseTransaction:
        """
        Begin a two-phase transaction.
        
        Parameters:
        - xid: Transaction ID (generated if None)
        
        Returns:
        Two-phase transaction instance
        """
    
    async def recover_twophase(self) -> list:
        """
        Get list of prepared transaction IDs.
        
        Returns:
        List of transaction IDs ready for commit/rollback
        """
    
    async def rollback_prepared(self, xid, *, is_prepared: bool = True) -> None:
        """
        Rollback a prepared two-phase transaction.
        
        Parameters:
        - xid: Transaction ID to rollback
        - is_prepared: Whether transaction is already prepared
        """
    
    async def commit_prepared(self, xid, *, is_prepared: bool = True) -> None:
        """
        Commit a prepared two-phase transaction.
        
        Parameters:
        - xid: Transaction ID to commit
        - is_prepared: Whether transaction is already prepared
        """
    
    async def close(self) -> None:
        """Close the connection."""

Transaction Classes

Transaction management classes for various transaction types.

class Transaction:
    @property
    def is_active(self) -> bool:
        """Whether transaction is active."""
    
    @property
    def connection(self) -> SAConnection:
        """Connection associated with transaction."""
    
    async def commit(self) -> None:
        """Commit the transaction."""
    
    async def rollback(self) -> None:
        """Rollback the transaction."""
    
    async def close(self) -> None:
        """Close the transaction."""

class RootTransaction(Transaction):
    """Root-level database transaction."""

class NestedTransaction(Transaction):
    """Nested transaction using savepoints."""

class TwoPhaseTransaction(Transaction):
    """Two-phase (XA) transaction."""
    
    @property
    def xid(self) -> str:
        """Transaction ID."""
    
    async def prepare(self) -> None:
        """Prepare transaction for commit."""

Result Classes

Classes for handling query results in SQLAlchemy style.

class ResultProxy:
    @property
    def dialect(self):
        """SQLAlchemy dialect."""
    
    @property
    def cursor(self) -> Cursor:
        """Underlying cursor."""
    
    @property
    def rowcount(self) -> int:
        """Number of affected rows."""
    
    @property
    def lastrowid(self) -> int:
        """Last inserted row ID."""
    
    @property
    def returns_rows(self) -> bool:
        """Whether query returns rows."""
    
    @property
    def closed(self) -> bool:
        """Whether result is closed."""
    
    async def fetchone(self) -> RowProxy:
        """
        Fetch next row.
        
        Returns:
        RowProxy instance or None
        """
    
    async def fetchall(self) -> list:
        """
        Fetch all remaining rows.
        
        Returns:
        List of RowProxy instances
        """
    
    async def fetchmany(self, size: int = None) -> list:
        """
        Fetch multiple rows.
        
        Parameters:
        - size: Number of rows to fetch
        
        Returns:
        List of RowProxy instances
        """
    
    async def first(self) -> RowProxy:
        """
        Fetch first row and close result.
        
        Returns:
        First RowProxy or None
        """
    
    async def scalar(self):
        """
        Fetch scalar value from first row.
        
        Returns:
        Single value from first column of first row
        """
    
    def keys(self) -> list:
        """
        Get column names.
        
        Returns:
        List of column names
        """
    
    async def close(self) -> None:
        """Close the result."""

class RowProxy:
    """Single result row with dict-like and sequence-like access."""
    
    def as_tuple(self) -> tuple:
        """
        Convert row to tuple.
        
        Returns:
        Row data as tuple
        """
    
    def __getitem__(self, key):
        """
        Access column by name or index.
        
        Parameters:
        - key: Column name (str) or index (int)
        
        Returns:
        Column value
        """
    
    def __getattr__(self, name):
        """
        Access column as attribute.
        
        Parameters:
        - name: Column name
        
        Returns:
        Column value
        """
    
    def __contains__(self, key) -> bool:
        """
        Check if column exists.
        
        Parameters:
        - key: Column name
        
        Returns:
        True if column exists
        """

Exception Classes

SQLAlchemy integration specific exceptions.

class Error(Exception):
    """Base SQLAlchemy integration error."""

class ArgumentError(Error):
    """Invalid or conflicting function arguments."""

class InvalidRequestError(Error):
    """Invalid operations or runtime state errors."""

class NoSuchColumnError(Error):
    """Nonexistent column requested from RowProxy."""

class ResourceClosedError(Error):
    """Operation on closed connection/cursor/result."""

Usage Examples

Basic SQLAlchemy Usage

import asyncio
import sqlalchemy as sa
import aiomysql.sa

async def sqlalchemy_basic_example():
    # Create engine
    engine = await aiomysql.sa.create_engine(
        host='localhost',
        port=3306,
        user='myuser',
        password='mypass',
        db='mydatabase',
        minsize=1,
        maxsize=5
    )
    
    # Define table metadata
    metadata = sa.MetaData()
    users = sa.Table('users', metadata,
                     sa.Column('id', sa.Integer, primary_key=True),
                     sa.Column('name', sa.String(50)),
                     sa.Column('email', sa.String(100)))
    
    # Execute queries
    async with engine.acquire() as conn:
        # Select query
        query = sa.select([users]).where(users.c.id < 10)
        result = await conn.execute(query)
        
        async for row in result:
            print(f"User: {row.name} ({row.email})")
        
        await result.close()
    
    # Cleanup
    engine.close()
    await engine.wait_closed()

asyncio.run(sqlalchemy_basic_example())

Transaction Management

async def transaction_example():
    engine = await aiomysql.sa.create_engine(
        host='localhost',
        user='myuser',
        password='mypass',
        db='mydatabase'
    )
    
    metadata = sa.MetaData()
    users = sa.Table('users', metadata,
                     sa.Column('id', sa.Integer, primary_key=True),
                     sa.Column('name', sa.String(50)),
                     sa.Column('balance', sa.Numeric(10, 2)))
    
    async with engine.acquire() as conn:
        # Begin transaction
        async with conn.begin() as trans:
            try:
                # Transfer money between users
                await conn.execute(
                    users.update().where(users.c.id == 1).values(
                        balance=users.c.balance - 100
                    )
                )
                await conn.execute(
                    users.update().where(users.c.id == 2).values(
                        balance=users.c.balance + 100
                    )
                )
                
                # Transaction commits automatically on success
                print("Transfer completed successfully")
                
            except Exception as e:
                # Transaction rolls back automatically on error
                print(f"Transfer failed: {e}")
                raise
    
    engine.close()
    await engine.wait_closed()

Nested Transactions (Savepoints)

async def nested_transaction_example():
    engine = await aiomysql.sa.create_engine(
        host='localhost',
        user='myuser',
        password='mypass',
        db='mydatabase'
    )
    
    metadata = sa.MetaData()
    orders = sa.Table('orders', metadata,
                      sa.Column('id', sa.Integer, primary_key=True),
                      sa.Column('user_id', sa.Integer),
                      sa.Column('total', sa.Numeric(10, 2)))
    
    async with engine.acquire() as conn:
        async with conn.begin() as trans:
            # Main transaction: create order
            result = await conn.execute(
                orders.insert().values(user_id=123, total=250.00)
            )
            order_id = result.lastrowid
            
            # Nested transaction: try to apply discount
            try:
                async with conn.begin_nested() as nested_trans:
                    # Try to apply discount
                    await conn.execute(
                        orders.update().where(orders.c.id == order_id).values(
                            total=orders.c.total * 0.9  # 10% discount
                        )
                    )
                    
                    # Simulate discount validation failure
                    if order_id % 2 == 0:  # Even order IDs fail
                        raise ValueError("Discount not applicable")
                    
                    print("Discount applied successfully")
                    
            except ValueError:
                # Nested transaction rolls back, main continues
                print("Discount failed, order created without discount")
            
            print(f"Order {order_id} completed")
    
    engine.close()
    await engine.wait_closed()

Complex Queries with Joins

async def complex_query_example():
    engine = await aiomysql.sa.create_engine(
        host='localhost',
        user='myuser',
        password='mypass',
        db='mydatabase'
    )
    
    metadata = sa.MetaData()
    users = sa.Table('users', metadata,
                     sa.Column('id', sa.Integer, primary_key=True),
                     sa.Column('name', sa.String(50)),
                     sa.Column('department_id', sa.Integer))
    
    departments = sa.Table('departments', metadata,
                           sa.Column('id', sa.Integer, primary_key=True),
                           sa.Column('name', sa.String(50)))
    
    async with engine.acquire() as conn:
        # Complex join query
        query = sa.select([
            users.c.name.label('user_name'),
            departments.c.name.label('dept_name')
        ]).select_from(
            users.join(departments, users.c.department_id == departments.c.id)
        ).where(
            departments.c.name.in_(['Engineering', 'Marketing'])
        ).order_by(users.c.name)
        
        result = await conn.execute(query)
        
        print("Users in Engineering and Marketing:")
        rows = await result.fetchall()
        for row in rows:
            print(f"{row.user_name} - {row.dept_name}")
        
        await result.close()
    
    engine.close()
    await engine.wait_closed()

Raw SQL with Parameters

async def raw_sql_example():
    engine = await aiomysql.sa.create_engine(
        host='localhost',
        user='myuser',
        password='mypass',
        db='mydatabase'
    )
    
    async with engine.acquire() as conn:
        # Raw SQL with parameters
        query = sa.text("""
            SELECT u.name, COUNT(o.id) as order_count
            FROM users u
            LEFT JOIN orders o ON u.id = o.user_id
            WHERE u.created_at >= :start_date
            GROUP BY u.id, u.name
            HAVING COUNT(o.id) >= :min_orders
            ORDER BY order_count DESC
        """)
        
        result = await conn.execute(query, {
            'start_date': '2023-01-01',
            'min_orders': 5
        })
        
        print("Top customers by order count:")
        async for row in result:
            print(f"{row.name}: {row.order_count} orders")
        
        await result.close()
    
    engine.close()
    await engine.wait_closed()

Install with Tessl CLI

npx tessl i tessl/pypi-aiomysql

docs

connections.md

cursors.md

index.md

pooling.md

sqlalchemy.md

tile.json