CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-aiomysql

MySQL driver for asyncio providing async/await support for database operations.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

pooling.mddocs/

Connection Pooling

Manage multiple database connections efficiently with connection pooling. Pools maintain a collection of reusable connections, reducing the overhead of creating and destroying connections for each database operation.

Capabilities

Pool Creation

Create a connection pool with configurable minimum and maximum connection limits.

def create_pool(
    minsize: int = 1,
    maxsize: int = 10,
    echo: bool = False,
    pool_recycle: int = -1,
    loop = None,
    **kwargs
) -> _PoolContextManager:
    """
    Create a connection pool.

    Parameters:
    - minsize: Minimum number of connections in pool
    - maxsize: Maximum number of connections in pool (0 = unlimited)
    - echo: Enable query logging for all connections
    - pool_recycle: Seconds after which to recreate connections (-1 = disabled)
    - loop: Event loop to use
    - **kwargs: Connection parameters (same as connect() function)

    Returns:
    Pool context manager
    """

Pool Management

The Pool class manages connection lifecycle, acquisition, and release operations.

class Pool:
    @property
    def minsize(self) -> int:
        """Minimum pool size."""
    
    @property
    def maxsize(self) -> int:
        """Maximum pool size."""
    
    @property
    def size(self) -> int:
        """Current total number of connections."""
    
    @property
    def freesize(self) -> int:
        """Number of free connections available."""
    
    @property
    def closed(self) -> bool:
        """Whether the pool is closed."""
    
    def acquire(self) -> Connection:
        """
        Acquire a connection from the pool.
        
        Returns:
        Connection context manager
        """
    
    def release(self, conn: Connection) -> None:
        """
        Return a connection to the pool.
        
        Parameters:
        - conn: Connection to release back to pool
        """
    
    async def clear(self) -> None:
        """
        Close all free connections in the pool.
        """
    
    def close(self) -> None:
        """
        Close the pool. Mark all connections for closure when returned.
        """
    
    def terminate(self) -> None:
        """
        Terminate the pool immediately, closing all connections.
        """
    
    async def wait_closed(self) -> None:
        """
        Wait for the pool to be completely closed.
        """

Usage Examples

Basic Pool Usage

import asyncio
import aiomysql

async def pool_example():
    # Create connection pool
    pool = await aiomysql.create_pool(
        host='localhost',
        port=3306,
        minsize=1,
        maxsize=5,
        user='myuser',
        password='mypass',
        db='mydatabase'
    )
    
    # Acquire connection from pool
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT COUNT(*) FROM users")
            count = await cur.fetchone()
            print(f"Total users: {count[0]}")
    
    # Connection automatically returned to pool
    
    # Close pool when done
    pool.close()
    await pool.wait_closed()

asyncio.run(pool_example())

Pool with Connection Recycling

async def recycling_pool():
    # Create pool with connection recycling
    pool = await aiomysql.create_pool(
        host='localhost',
        user='myuser',
        password='mypass',
        db='mydatabase',
        minsize=2,
        maxsize=10,
        pool_recycle=3600,  # Recreate connections every hour
        echo=True  # Enable query logging
    )
    
    # Use pool for multiple operations
    for i in range(5):
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT SLEEP(1)")
                print(f"Operation {i+1} completed")
    
    print(f"Pool size: {pool.size}, Free: {pool.freesize}")
    
    # Cleanup
    pool.close()
    await pool.wait_closed()

Concurrent Pool Operations

async def worker(pool, worker_id):
    """Worker function that uses pool connections."""
    for i in range(3):
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT %s, %s", (worker_id, i))
                result = await cur.fetchone()
                print(f"Worker {worker_id}, iteration {i}: {result}")
                
                # Simulate work
                await asyncio.sleep(0.1)

async def concurrent_example():
    # Create pool
    pool = await aiomysql.create_pool(
        host='localhost',
        user='myuser',
        password='mypass',
        db='mydatabase',
        minsize=2,
        maxsize=5
    )
    
    # Run multiple workers concurrently
    tasks = [worker(pool, i) for i in range(4)]
    await asyncio.gather(*tasks)
    
    print(f"Final pool stats - Size: {pool.size}, Free: {pool.freesize}")
    
    # Cleanup
    pool.close()
    await pool.wait_closed()

asyncio.run(concurrent_example())

Context Manager Usage

async def context_manager_example():
    # Pool can be used as async context manager
    async with aiomysql.create_pool(
        host='localhost',
        user='myuser', 
        password='mypass',
        db='mydatabase',
        minsize=1,
        maxsize=3
    ) as pool:
        
        # Multiple operations using the pool
        async with pool.acquire() as conn1:
            async with conn1.cursor() as cur:
                await cur.execute("INSERT INTO logs (message) VALUES ('Start')")
        
        async with pool.acquire() as conn2:
            async with conn2.cursor() as cur:
                await cur.execute("INSERT INTO logs (message) VALUES ('Middle')")
        
        async with pool.acquire() as conn3:
            async with conn3.cursor() as cur:
                await cur.execute("INSERT INTO logs (message) VALUES ('End')")
    
    # Pool automatically closed when exiting context
    print("Pool operations completed and pool closed")

Error Handling with Pools

async def error_handling_example():
    pool = await aiomysql.create_pool(
        host='localhost',
        user='myuser',
        password='mypass',
        db='mydatabase',
        minsize=1,
        maxsize=3
    )
    
    try:
        async with pool.acquire() as conn:
            async with conn.cursor() as cur:
                # This will cause an error
                await cur.execute("SELECT * FROM nonexistent_table")
                
    except aiomysql.ProgrammingError as e:
        print(f"SQL error: {e}")
        # Connection is still returned to pool even after error
        
    except aiomysql.OperationalError as e:
        print(f"Connection error: {e}")
        # Pool will handle bad connections automatically
        
    finally:
        # Always clean up pool
        pool.close()
        await pool.wait_closed()

Install with Tessl CLI

npx tessl i tessl/pypi-aiomysql

docs

connections.md

cursors.md

index.md

pooling.md

sqlalchemy.md

tile.json