CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-mysql-connector-python

A self-contained Python driver for communicating with MySQL servers, using an API that is compliant with the Python Database API Specification v2.0 (PEP 249).

Pending
Overview
Eval results
Files

async.mddocs/

Asynchronous Operations

Perform database operations asynchronously using asyncio with full async/await support, providing non-blocking database access for high-concurrency applications.

Async Connection Management

Async Connection Functions

import mysql.connector.aio

async def connect(**kwargs) -> 'MySQLConnection':
    """
    Create async connection to MySQL server.
    
    Returns:
        MySQLConnection instance for async operations
    """
    pass

MySQLConnection (Async)

class MySQLConnection:
    """
    Async connection class with asyncio support.
    Provides non-blocking database operations using async/await.
    """
    
    def __init__(self, **kwargs) -> None:
        """Initialize async connection with configuration."""
        pass
    
    async def connect(self) -> None:
        """Establish async connection to MySQL server."""
        pass
    
    async def disconnect(self) -> None:
        """Close async connection to MySQL server."""
        pass
    
    async def close(self) -> None:
        """Close connection (alias for disconnect)."""
        pass
    
    def is_connected(self) -> bool:
        """Check if connection is active (non-blocking check)."""
        pass
    
    async def ping(self, reconnect: bool = False, attempts: int = 1, delay: int = 0) -> None:
        """Test connection to server asynchronously."""
        pass
    
    async def reconnect(self, attempts: int = 1, delay: int = 0) -> None:
        """Reconnect to MySQL server asynchronously."""
        pass
    
    def cursor(self, 
               buffered: Optional[bool] = None,
               raw: Optional[bool] = None,
               prepared: Optional[bool] = None,
               cursor_class: Optional[Type] = None,
               dictionary: Optional[bool] = None) -> 'MySQLCursor':
        """Create async cursor for executing SQL statements."""
        pass
    
    async def commit(self) -> None:
        """Commit current transaction asynchronously."""
        pass
    
    async def rollback(self) -> None:
        """Rollback current transaction asynchronously."""
        pass
    
    async def start_transaction(self, 
                               consistent_snapshot: bool = False,
                               isolation_level: Optional[str] = None,
                               readonly: Optional[bool] = None) -> None:
        """Start new transaction asynchronously."""
        pass
    
    @property
    def autocommit(self) -> bool:
        """Get autocommit mode status."""
        pass
    
    async def set_autocommit(self, value: bool) -> None:
        """Set autocommit mode asynchronously."""
        pass
    
    @property
    def database(self) -> str:
        """Get current database name."""
        pass
    
    async def set_database(self, value: str) -> None:
        """Change current database asynchronously."""
        pass
    
    @property
    def server_version(self) -> Tuple[int, int, int]:
        """Get MySQL server version tuple."""
        pass
    
    @property
    def connection_id(self) -> int:
        """Get MySQL connection ID."""
        pass
    
    @property
    def charset(self) -> str:
        """Get connection character set."""
        pass
    
    async def set_charset(self, value: str) -> None:
        """Set connection character set asynchronously."""
        pass
    
    async def cmd_query(self, query: Union[str, bytes]) -> Dict:
        """Execute query asynchronously and return raw result."""
        pass
    
    async def cmd_quit(self) -> bytes:
        """Send quit command to server asynchronously."""
        pass
    
    async def cmd_init_db(self, database: str) -> bytes:
        """Send init_db command to change database asynchronously."""
        pass
    
    async def cmd_refresh(self, options: int) -> bytes:
        """Send refresh command asynchronously."""
        pass
    
    async def cmd_statistics(self) -> Dict:
        """Get server statistics asynchronously."""
        pass
    
    async def cmd_ping(self) -> bytes:
        """Send ping command to server asynchronously."""
        pass
    
    async def reset_session(self, 
                           user_variables: Optional[Dict] = None, 
                           session_variables: Optional[Dict] = None) -> None:
        """Reset session to initial state asynchronously."""
        pass
    
    async def get_warnings(self, count: Optional[int] = None) -> List[Tuple]:
        """Get warning messages from last statement asynchronously."""
        pass
    
    @property
    def warning_count(self) -> int:
        """Get warning count from last statement."""
        pass
    
    @property
    def info_msg(self) -> Optional[str]:
        """Get info message from last statement."""
        pass
    
    @property
    def insert_id(self) -> int:
        """Get auto-generated ID from last INSERT."""
        pass
    
    @property
    def affected_rows(self) -> int:
        """Get affected row count from last statement."""
        pass
    
    @property
    def in_transaction(self) -> bool:
        """Check if connection is in transaction."""
        pass
    
    async def __aenter__(self) -> 'MySQLConnection':
        """Async context manager entry."""
        pass
    
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit with automatic cleanup."""
        pass

MySQLConnectionAbstract

class MySQLConnectionAbstract:
    """
    Abstract base class for async connections.
    Defines interface for async connection implementations.
    """
    pass

Async Cursor Operations

MySQLCursor (Async)

class MySQLCursor:
    """
    Async cursor for executing SQL statements.
    Provides non-blocking query execution and result fetching.
    """
    
    async def execute(self, operation: str, params: Optional[Union[Sequence, Dict]] = None, multi: bool = False) -> Optional[AsyncIterator]:
        """Execute SQL statement asynchronously with optional parameters."""
        pass
    
    async def executemany(self, operation: str, seq_params: Sequence[Union[Sequence, Dict]]) -> None:
        """Execute SQL statement multiple times asynchronously."""
        pass
    
    async def fetchone(self) -> Optional[Tuple]:
        """Fetch next row from result set asynchronously."""
        pass
    
    async def fetchmany(self, size: Optional[int] = None) -> List[Tuple]:
        """Fetch specified number of rows asynchronously."""
        pass
    
    async def fetchall(self) -> List[Tuple]:
        """Fetch all remaining rows asynchronously."""
        pass
    
    async def close(self) -> None:
        """Close cursor and free resources asynchronously."""
        pass
    
    async def callproc(self, procname: str, args: Sequence = ()) -> Optional[Dict]:
        """Call stored procedure asynchronously."""
        pass
    
    def stored_results(self) -> AsyncIterator['MySQLCursor']:
        """Return async iterator for stored procedure result sets."""
        pass
    
    async def nextset(self) -> Optional[bool]:
        """Skip to next result set asynchronously."""
        pass
    
    @property
    def description(self) -> Optional[List[Tuple]]:
        """Column metadata for last executed query."""
        pass
    
    @property 
    def rowcount(self) -> int:
        """Number of rows affected by last operation."""
        pass
    
    @property
    def lastrowid(self) -> Optional[int]:
        """Auto-generated ID from last INSERT operation."""
        pass
    
    @property
    def arraysize(self) -> int:
        """Default number of rows fetchmany() should return."""
        pass
    
    @arraysize.setter
    def arraysize(self, value: int) -> None:
        """Set default fetchmany() size."""
        pass
    
    @property
    def statement(self) -> Optional[str]:
        """Last executed SQL statement."""
        pass
    
    @property
    def with_rows(self) -> bool:
        """Whether last operation produced result rows."""
        pass
    
    @property
    def column_names(self) -> Tuple[str, ...]:
        """Column names from result set."""
        pass
    
    def __aiter__(self) -> 'MySQLCursor':
        """Make cursor async iterable over result rows."""
        pass
    
    async def __anext__(self) -> Tuple:
        """Get next row for async iteration."""
        pass
    
    async def __aenter__(self) -> 'MySQLCursor':
        """Async context manager entry."""
        pass
    
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit with automatic cleanup."""
        pass

Async Connection Pooling

MySQLConnectionPool (Async)

class MySQLConnectionPool:
    """
    Async connection pool manager.
    Manages pool of async database connections for efficient reuse.
    """
    
    def __init__(self, 
                 pool_name: Optional[str] = None,
                 pool_size: int = 5,
                 pool_reset_session: bool = True,
                 **kwargs) -> None:
        """Initialize async connection pool."""
        pass
    
    async def get_connection(self) -> 'PooledMySQLConnection':
        """Get connection from pool asynchronously."""
        pass
    
    async def add_connection(self, cnx: Optional['MySQLConnection'] = None) -> 'PooledMySQLConnection':
        """Add connection to pool asynchronously."""
        pass
    
    def set_config(self, **kwargs) -> None:
        """Update pool configuration."""
        pass
    
    async def close(self) -> None:
        """Close all connections in pool asynchronously."""
        pass
    
    @property
    def pool_name(self) -> str:
        """Pool name identifier."""
        pass
    
    @property
    def pool_size(self) -> int:
        """Maximum pool size."""
        pass

PooledMySQLConnection (Async)

class PooledMySQLConnection:
    """
    Async pooled connection wrapper.
    Returns connection to pool on close in async context.
    """
    
    def __init__(self, pool: MySQLConnectionPool, cnx: 'MySQLConnection') -> None:
        """Initialize async pooled connection wrapper."""
        pass
    
    async def close(self) -> None:
        """Return connection to pool asynchronously."""
        pass
    
    @property
    def pool_name(self) -> str:
        """Name of the connection pool."""
        pass
    
    async def __aenter__(self) -> 'PooledMySQLConnection':
        """Async context manager entry."""
        pass
    
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit returning connection to pool."""
        pass

Async Pooling Functions

async def connect(**kwargs) -> Union[MySQLConnection, PooledMySQLConnection]:
    """
    Create async database connection with optional pooling.
    
    When pool parameters are provided, returns PooledMySQLConnection.
    Otherwise returns MySQLConnection.
    """
    pass

Usage Examples

Basic Async Connection

import asyncio
import mysql.connector.aio

async def main():
    # Create async connection
    connection = await mysql.connector.aio.connect(
        host='localhost',
        user='myuser',
        password='mypassword',
        database='mydatabase'
    )
    
    # Create async cursor
    cursor = connection.cursor()
    
    # Execute query asynchronously
    await cursor.execute("SELECT id, name FROM users WHERE age > %s", (25,))
    
    # Fetch results asynchronously
    async for (user_id, name) in cursor:
        print(f"User {user_id}: {name}")
    
    # Cleanup
    await cursor.close()
    await connection.close()

# Run async function
asyncio.run(main())

Async Context Managers

import asyncio
import mysql.connector.aio

async def main():
    # Automatic async connection cleanup
    async with mysql.connector.aio.connect(
        host='localhost',
        user='myuser', 
        password='mypassword',
        database='mydatabase'
    ) as connection:
        
        # Automatic async cursor cleanup
        async with connection.cursor(dictionary=True) as cursor:
            await cursor.execute("SELECT COUNT(*) as total FROM users")
            result = await cursor.fetchone()
            print(f"Total users: {result['total']}")
            # Cursor automatically closed
        # Connection automatically closed

asyncio.run(main())

Async Transaction Management

import asyncio
import mysql.connector.aio

async def transfer_funds(from_account: int, to_account: int, amount: float):
    async with mysql.connector.aio.connect(
        host='localhost',
        user='myuser',
        password='mypassword',
        database='mydatabase'
    ) as connection:
        
        try:
            # Start transaction
            await connection.start_transaction()
            
            async with connection.cursor() as cursor:
                # Debit from account
                await cursor.execute(
                    "UPDATE accounts SET balance = balance - %s WHERE id = %s",
                    (amount, from_account)
                )
                
                # Credit to account
                await cursor.execute(
                    "UPDATE accounts SET balance = balance + %s WHERE id = %s", 
                    (amount, to_account)
                )
                
                # Check if both operations affected rows
                if cursor.rowcount == 0:
                    raise ValueError("Account not found")
            
            # Commit transaction
            await connection.commit()
            print(f"Transferred {amount} from {from_account} to {to_account}")
            
        except Exception as err:
            # Rollback on error
            await connection.rollback()
            print(f"Transfer failed: {err}")
            raise

asyncio.run(transfer_funds(1, 2, 100.0))

Async Connection Pooling

import asyncio
import mysql.connector.aio

async def worker_task(worker_id: int):
    """Async worker using pooled connection."""
    # Get connection from async pool
    async with mysql.connector.aio.connect(
        host='localhost',
        user='myuser',
        password='mypassword',
        database='mydatabase',
        pool_name='async_pool',
        pool_size=5
    ) as connection:
        
        async with connection.cursor() as cursor:
            await cursor.execute("SELECT SLEEP(%s)", (1,))
            await cursor.fetchone()
            
        print(f"Async worker {worker_id} completed")

async def main():
    # Create multiple async tasks
    tasks = [worker_task(i) for i in range(10)]
    
    # Run tasks concurrently
    await asyncio.gather(*tasks)
    print("All async workers completed")

asyncio.run(main())

Async Batch Processing

import asyncio
import mysql.connector.aio

async def process_batch(batch_data: List[Dict]):
    """Process batch of data asynchronously."""
    async with mysql.connector.aio.connect(
        host='localhost',
        user='myuser',
        password='mypassword',
        database='mydatabase'
    ) as connection:
        
        async with connection.cursor() as cursor:
            # Prepare batch insert
            insert_query = "INSERT INTO processed_data (id, value, timestamp) VALUES (%s, %s, NOW())"
            
            # Execute batch asynchronously
            for item in batch_data:
                await cursor.execute(insert_query, (item['id'], item['value']))
            
            await connection.commit()
            print(f"Processed batch of {len(batch_data)} items")

async def main():
    # Sample data batches
    batches = [
        [{'id': 1, 'value': 'A'}, {'id': 2, 'value': 'B'}],
        [{'id': 3, 'value': 'C'}, {'id': 4, 'value': 'D'}],
        [{'id': 5, 'value': 'E'}, {'id': 6, 'value': 'F'}]
    ]
    
    # Process all batches concurrently
    tasks = [process_batch(batch) for batch in batches]
    await asyncio.gather(*tasks)
    print("All batches processed")

asyncio.run(main())

Async Result Streaming

import asyncio
import mysql.connector.aio

async def stream_large_dataset():
    """Stream large dataset asynchronously."""
    async with mysql.connector.aio.connect(
        host='localhost',
        user='myuser',
        password='mypassword',
        database='mydatabase'
    ) as connection:
        
        async with connection.cursor() as cursor:
            await cursor.execute("SELECT * FROM large_table ORDER BY id")
            
            # Stream results asynchronously
            count = 0
            async for row in cursor:
                # Process each row as it arrives
                print(f"Processing row {count}: {row[0]}")
                count += 1
                
                # Yield control to other tasks periodically
                if count % 1000 == 0:
                    await asyncio.sleep(0)  # Yield control
                    
            print(f"Streamed {count} rows")

asyncio.run(stream_large_dataset())

Async with Multiple Databases

import asyncio
import mysql.connector.aio

async def sync_data_between_databases():
    """Sync data between two databases asynchronously."""
    
    # Connect to source database
    source_conn = await mysql.connector.aio.connect(
        host='source.mysql.example.com',
        user='myuser',
        password='mypassword',
        database='source_db'
    )
    
    # Connect to destination database  
    dest_conn = await mysql.connector.aio.connect(
        host='dest.mysql.example.com',
        user='myuser',
        password='mypassword',
        database='dest_db'
    )
    
    try:
        # Get cursors for both connections
        source_cursor = source_conn.cursor(dictionary=True)
        dest_cursor = dest_conn.cursor()
        
        # Read from source
        await source_cursor.execute("SELECT * FROM users WHERE updated_at > %s", ('2024-01-01',))
        
        # Process and insert to destination
        async for user in source_cursor:
            await dest_cursor.execute(
                "INSERT INTO users (id, name, email) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE name=%s, email=%s",
                (user['id'], user['name'], user['email'], user['name'], user['email'])
            )
        
        # Commit destination changes
        await dest_conn.commit()
        print("Data sync completed")
        
    finally:
        # Cleanup both connections
        await source_cursor.close()
        await dest_cursor.close()
        await source_conn.close()
        await dest_conn.close()

asyncio.run(sync_data_between_databases())

Async with Timeout and Error Handling

import asyncio
import mysql.connector.aio

async def query_with_timeout():
    """Execute query with timeout and error handling."""
    try:
        # Set timeout for entire operation
        async with asyncio.timeout(30):  # 30 second timeout
            async with mysql.connector.aio.connect(
                host='localhost',
                user='myuser',
                password='mypassword',
                database='mydatabase',
                connect_timeout=10  # Connection timeout
            ) as connection:
                
                async with connection.cursor() as cursor:
                    # Long-running query
                    await cursor.execute("SELECT * FROM large_table WHERE complex_condition = %s", ('value',))
                    
                    results = await cursor.fetchall()
                    print(f"Query returned {len(results)} rows")
                    
    except asyncio.TimeoutError:
        print("Query timed out")
    except mysql.connector.Error as err:
        print(f"Database error: {err}")
    except Exception as err:
        print(f"Unexpected error: {err}")

asyncio.run(query_with_timeout())

Install with Tessl CLI

npx tessl i tessl/pypi-mysql-connector-python

docs

async.md

auth.md

connection.md

constants.md

cursors.md

errors.md

index.md

pooling.md

types.md

utilities.md

tile.json