CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cymysql

Python MySQL Driver using Cython for high-performance database connectivity with async support

Pending
Overview
Eval results
Files

async-operations.mddocs/

Asynchronous Operations

Asyncio-compatible database operations including async connections, cursors, and connection pooling for high-performance concurrent database access.

Capabilities

Async Connection Creation

Create asynchronous database connections that integrate with Python's asyncio event loop for non-blocking database operations.

async def connect(host="localhost", user=None, passwd="", db=None, port=3306, 
                 unix_socket=None, charset='', sql_mode=None, read_default_file=None,
                 client_flag=0, cursorclass=None, init_command=None, connect_timeout=None,
                 ssl=None, read_default_group=None, compress="", zstd_compression_level=3,
                 named_pipe=None, conv=None, encoders=None, loop=None):
    """
    Create an asynchronous database connection.
    
    Parameters:
    - host (str): MySQL server hostname or IP address (default: "localhost")
    - user (str): Username for authentication
    - passwd (str): Password for authentication (default: "")
    - port (int): MySQL server port number (default: 3306)
    - db (str): Default database name
    - unix_socket (str): Unix socket path for local connections
    - charset (str): Character set for connection (default: '')
    - sql_mode (str): SQL mode setting for connection
    - read_default_file (str): MySQL configuration file path
    - client_flag (int): Custom flags to send to MySQL
    - cursorclass: Default cursor class for this connection
    - init_command (str): SQL command to run on connection
    - connect_timeout (int): Connection timeout in seconds
    - ssl (dict): SSL configuration parameters
    - read_default_group (str): Group to read from configuration file
    - compress (str): Compression algorithm ("zlib" or "zstd")
    - zstd_compression_level (int): ZSTD compression level (1-22, default: 3)
    - named_pipe: Not supported (raises NotImplementedError)
    - conv: Decoders dictionary for custom type marshalling
    - encoders: Encoders dictionary for custom type marshalling
    - loop: Event loop to use (default: current event loop)
    
    Returns:
    AsyncConnection: Asynchronous database connection object
    
    Raises:
    OperationalError: Connection failed
    InterfaceError: Invalid connection parameters
    """

Async Connection Management

The AsyncConnection class provides asynchronous methods for managing database connections and executing operations without blocking the event loop.

class AsyncConnection:
    async def cursor(self, cursor=None):
        """
        Create a new async cursor object.
        
        Parameters:
        - cursor: Async cursor class to instantiate (optional)
        
        Returns:
        AsyncCursor: New async cursor object
        """
    
    async def commit(self):
        """
        Commit current transaction asynchronously.
        
        Raises:
        OperationalError: Transaction commit failed
        """
    
    async def rollback(self):
        """
        Roll back current transaction asynchronously.
        
        Raises:
        OperationalError: Transaction rollback failed
        """
    
    def close(self):
        """
        Close the database connection.
        """
    
    async def ensure_closed(self):
        """
        Ensure connection is properly closed and cleaned up.
        """
    
    async def autocommit(self, value):
        """
        Enable or disable autocommit mode asynchronously.
        
        Parameters:
        - value (bool): True to enable autocommit, False to disable
        """
    
    async def ping(self):
        """
        Check if connection to server is alive asynchronously.
        
        Raises:
        OperationalError: Connection check failed
        """
    
    async def set_charset(self, charset):
        """
        Set connection character set asynchronously.
        
        Parameters:
        - charset (str): Character set name
        """

Async Cursor Operations

Asynchronous cursor providing non-blocking SQL execution and result retrieval.

class AsyncCursor:
    async def execute(self, query, args=None):
        """
        Execute a SQL statement asynchronously.
        
        Parameters:
        - query (str): SQL statement with optional %s placeholders
        - args (tuple/list/dict): Parameters to bind to placeholders
        
        Returns:
        int: Number of affected rows
        
        Raises:
        ProgrammingError: SQL syntax error
        OperationalError: Database operation error
        """
    
    async def executemany(self, query, args_list):
        """
        Execute a SQL statement multiple times asynchronously.
        
        Parameters:
        - query (str): SQL statement with %s placeholders
        - args_list (list/tuple): Sequence of parameter tuples/lists
        
        Returns:
        int: Number of affected rows from all executions
        """
    
    async def fetchone(self):
        """
        Fetch the next row asynchronously.
        
        Returns:
        tuple: Next row data, or None if no more rows
        """
    
    async def fetchmany(self, size=None):
        """
        Fetch multiple rows asynchronously.
        
        Parameters:
        - size (int): Number of rows to fetch (default: arraysize)
        
        Returns:
        list: List of row tuples
        """
    
    async def fetchall(self):
        """
        Fetch all remaining rows asynchronously.
        
        Returns:
        list: List of all remaining row tuples
        """
    
    def close(self):
        """
        Close the cursor and free resources.
        """
    
    async def __aenter__(self):
        """Async context manager entry."""
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit."""

Dictionary Cursor

Async cursor that returns rows as dictionaries instead of tuples.

class AsyncDictCursor(AsyncCursor):
    """
    Async cursor that returns rows as dictionaries with column names as keys.
    
    Each row is returned as a dictionary mapping column names to values,
    making it easier to access specific columns by name in async operations.
    """
    
    async def execute(self, query, args=None):
        """Execute query and prepare field mapping for dictionary results."""
        
    async def fetchone(self):
        """
        Fetch the next row as a dictionary asynchronously.
        
        Returns:
        dict: Row data as {column_name: value} mapping, or None if no more rows
        """
        
    async def fetchmany(self, size=None):
        """
        Fetch multiple rows as dictionaries asynchronously.
        
        Parameters:
        - size (int): Number of rows to fetch (default: arraysize)
        
        Returns:
        tuple: Tuple of dictionaries representing rows
        """
        
    async def fetchall(self):
        """
        Fetch all remaining rows as dictionaries asynchronously.
        
        Returns:
        tuple: Tuple of dictionaries representing all remaining rows
        """

Connection Pooling

Efficient connection pooling for high-throughput async applications with automatic connection lifecycle management.

def create_pool(minsize=1, maxsize=10, pool_recycle=-1, loop=None, **kwargs):
    """
    Create an async connection pool context manager.
    
    Parameters:
    - minsize (int): Minimum number of connections in pool
    - maxsize (int): Maximum number of connections in pool  
    - pool_recycle (int): Connection recycle time in seconds (-1 = no recycle)
    - loop: Event loop to use (default: current event loop)
    - **kwargs: Connection parameters for pool connections
    
    Returns:
    PoolContextManager: Context manager for connection pool
    """

async def _create_pool(minsize=1, maxsize=10, pool_recycle=-1, loop=None, **kwargs):
    """
    Create an async connection pool directly.
    
    Returns:
    Pool: Connection pool object
    """

class Pool:
    """
    Async connection pool managing multiple database connections.
    """
    
    def acquire(self):
        """
        Get a connection from the pool.
        
        Returns:
        PoolAcquireContextManager: Context manager for pool connection
        """
    
    async def _acquire(self):
        """
        Acquire a connection from the pool directly.
        
        Returns:
        AsyncConnection: Database connection from pool
        """
    
    def release(self, conn):
        """
        Return a connection to the pool.
        
        Parameters:
        - conn: Connection to return to pool
        """
    
    def close(self):
        """
        Close the pool and all connections.
        """
    
    async def wait_closed(self):
        """
        Wait for pool closure to complete.
        """
    
    @property
    def size(self):
        """Current number of connections in pool."""
    
    @property  
    def freesize(self):
        """Number of free connections in pool."""

Usage Examples

Basic Async Connection

import asyncio
import cymysql.aio

async def basic_query():
    conn = await cymysql.aio.connect(
        host='localhost',
        user='root',
        password='password',
        db='testdb'
    )
    
    cursor = await conn.cursor()
    await cursor.execute("SELECT COUNT(*) FROM users")
    
    result = await cursor.fetchone()
    print(f"User count: {result[0]}")
    
    cursor.close()
    conn.close()

asyncio.run(basic_query())

Async Context Managers

import asyncio
import cymysql.aio

async def context_manager_example():
    conn = await cymysql.aio.connect(
        host='localhost',
        user='root', 
        password='password',
        db='testdb'
    )
    
    # Cursor context manager
    async with conn.cursor() as cursor:
        await cursor.execute("SELECT id, name FROM users WHERE active = %s", (True,))
        
        async for row in cursor:
            print(f"User: {row[1]} (ID: {row[0]})")
    
    conn.close()

asyncio.run(context_manager_example())

Connection Pool Usage

import asyncio
import cymysql.aio

async def pool_example():
    # Create connection pool
    pool = await cymysql.aio.create_pool(
        host='localhost',
        user='root',
        password='password', 
        db='testdb',
        minsize=1,
        maxsize=10
    )
    
    # Use pool connection
    async with pool.acquire() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("SELECT VERSION()")
            version = await cursor.fetchone()
            print(f"MySQL version: {version[0]}")
    
    # Clean up pool
    pool.close()
    await pool.wait_closed()

asyncio.run(pool_example())

Concurrent Database Operations

import asyncio
import cymysql.aio

async def fetch_user_data(pool, user_id):
    async with pool.acquire() as conn:
        async with conn.cursor(cymysql.aio.AsyncDictCursor) as cursor:
            await cursor.execute(
                "SELECT id, name, email FROM users WHERE id = %s", 
                (user_id,)
            )
            return await cursor.fetchone()

async def concurrent_example():
    pool = await cymysql.aio.create_pool(
        host='localhost',
        user='root',
        password='password',
        db='testdb',
        minsize=5,
        maxsize=20
    )
    
    # Fetch multiple users concurrently
    user_ids = [1, 2, 3, 4, 5]
    tasks = [fetch_user_data(pool, uid) for uid in user_ids]
    
    users = await asyncio.gather(*tasks)
    
    for user in users:
        if user:
            print(f"User: {user['name']} ({user['email']})")
    
    pool.close()
    await pool.wait_closed()

asyncio.run(concurrent_example())

Async Transaction Management

import asyncio
import cymysql.aio

async def transfer_funds(pool, from_account, to_account, amount):
    async with pool.acquire() as conn:
        try:
            # Disable autocommit for transaction
            await conn.autocommit(False)
            
            async with conn.cursor() as cursor:
                # Debit from source account
                await cursor.execute(
                    "UPDATE accounts SET balance = balance - %s WHERE id = %s",
                    (amount, from_account)
                )
                
                # Credit to destination account
                await cursor.execute(
                    "UPDATE accounts SET balance = balance + %s WHERE id = %s", 
                    (amount, to_account)
                )
                
                # Commit transaction
                await conn.commit()
                print(f"Transfer of ${amount} completed successfully")
                
        except Exception as e:
            # Rollback on error
            await conn.rollback()
            print(f"Transfer failed: {e}")
        finally:
            # Re-enable autocommit
            await conn.autocommit(True)

async def transaction_example():
    pool = await cymysql.aio.create_pool(
        host='localhost',
        user='root', 
        password='password',
        db='banking'
    )
    
    await transfer_funds(pool, from_account=1, to_account=2, amount=100.00)
    
    pool.close()
    await pool.wait_closed()

asyncio.run(transaction_example())

Streaming Large Result Sets

import asyncio
import cymysql.aio

async def process_large_dataset(pool):
    async with pool.acquire() as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("SELECT * FROM large_table")
            
            # Process results in batches to avoid memory issues
            while True:
                rows = await cursor.fetchmany(1000)
                if not rows:
                    break
                    
                # Process batch
                for row in rows:
                    await process_row_async(row)
                
                print(f"Processed batch of {len(rows)} rows")

async def process_row_async(row):
    # Simulate async processing
    await asyncio.sleep(0.001)

asyncio.run(process_large_dataset(pool))

Error Handling in Async Operations

import asyncio
import cymysql.aio
from cymysql import OperationalError, ProgrammingError

async def robust_async_operation():
    pool = None
    try:
        pool = await cymysql.aio.create_pool(
            host='localhost',
            user='root',
            password='password',
            db='testdb',
            minsize=1,
            maxsize=5
        )
        
        async with pool.acquire() as conn:
            async with conn.cursor() as cursor:
                await cursor.execute("SELECT COUNT(*) FROM nonexistent_table")
                result = await cursor.fetchone()
                
    except OperationalError as e:
        print(f"Database operation error: {e}")
    except ProgrammingError as e:
        print(f"SQL programming error: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")
    finally:
        if pool:
            pool.close()
            await pool.wait_closed()

asyncio.run(robust_async_operation())

Performance Best Practices

  • Use connection pooling for applications with multiple concurrent database operations
  • Set appropriate minsize and maxsize for your application's concurrency needs
  • Use fetchmany() for large result sets to control memory usage
  • Properly close connections and pools to avoid resource leaks
  • Consider pool_recycle parameter for long-running applications
  • Use async context managers for automatic resource cleanup

Install with Tessl CLI

npx tessl i tessl/pypi-cymysql

docs

async-operations.md

connections.md

cursors.md

data-types.md

error-handling.md

index.md

tile.json