MySQL driver for asyncio providing async/await support for database operations.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Manage multiple database connections efficiently with connection pooling. Pools maintain a collection of reusable connections, reducing the overhead of creating and destroying connections for each database operation.
Create a connection pool with configurable minimum and maximum connection limits.
def create_pool(
minsize: int = 1,
maxsize: int = 10,
echo: bool = False,
pool_recycle: int = -1,
loop = None,
**kwargs
) -> _PoolContextManager:
"""
Create a connection pool.
Parameters:
- minsize: Minimum number of connections in pool
- maxsize: Maximum number of connections in pool (0 = unlimited)
- echo: Enable query logging for all connections
- pool_recycle: Seconds after which to recreate connections (-1 = disabled)
- loop: Event loop to use
- **kwargs: Connection parameters (same as connect() function)
Returns:
Pool context manager
"""The Pool class manages connection lifecycle, acquisition, and release operations.
class Pool:
@property
def minsize(self) -> int:
"""Minimum pool size."""
@property
def maxsize(self) -> int:
"""Maximum pool size."""
@property
def size(self) -> int:
"""Current total number of connections."""
@property
def freesize(self) -> int:
"""Number of free connections available."""
@property
def closed(self) -> bool:
"""Whether the pool is closed."""
def acquire(self) -> Connection:
"""
Acquire a connection from the pool.
Returns:
Connection context manager
"""
def release(self, conn: Connection) -> None:
"""
Return a connection to the pool.
Parameters:
- conn: Connection to release back to pool
"""
async def clear(self) -> None:
"""
Close all free connections in the pool.
"""
def close(self) -> None:
"""
Close the pool. Mark all connections for closure when returned.
"""
def terminate(self) -> None:
"""
Terminate the pool immediately, closing all connections.
"""
async def wait_closed(self) -> None:
"""
Wait for the pool to be completely closed.
"""import asyncio
import aiomysql
async def pool_example():
# Create connection pool
pool = await aiomysql.create_pool(
host='localhost',
port=3306,
minsize=1,
maxsize=5,
user='myuser',
password='mypass',
db='mydatabase'
)
# Acquire connection from pool
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT COUNT(*) FROM users")
count = await cur.fetchone()
print(f"Total users: {count[0]}")
# Connection automatically returned to pool
# Close pool when done
pool.close()
await pool.wait_closed()
asyncio.run(pool_example())async def recycling_pool():
# Create pool with connection recycling
pool = await aiomysql.create_pool(
host='localhost',
user='myuser',
password='mypass',
db='mydatabase',
minsize=2,
maxsize=10,
pool_recycle=3600, # Recreate connections every hour
echo=True # Enable query logging
)
# Use pool for multiple operations
for i in range(5):
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT SLEEP(1)")
print(f"Operation {i+1} completed")
print(f"Pool size: {pool.size}, Free: {pool.freesize}")
# Cleanup
pool.close()
await pool.wait_closed()async def worker(pool, worker_id):
"""Worker function that uses pool connections."""
for i in range(3):
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT %s, %s", (worker_id, i))
result = await cur.fetchone()
print(f"Worker {worker_id}, iteration {i}: {result}")
# Simulate work
await asyncio.sleep(0.1)
async def concurrent_example():
# Create pool
pool = await aiomysql.create_pool(
host='localhost',
user='myuser',
password='mypass',
db='mydatabase',
minsize=2,
maxsize=5
)
# Run multiple workers concurrently
tasks = [worker(pool, i) for i in range(4)]
await asyncio.gather(*tasks)
print(f"Final pool stats - Size: {pool.size}, Free: {pool.freesize}")
# Cleanup
pool.close()
await pool.wait_closed()
asyncio.run(concurrent_example())async def context_manager_example():
# Pool can be used as async context manager
async with aiomysql.create_pool(
host='localhost',
user='myuser',
password='mypass',
db='mydatabase',
minsize=1,
maxsize=3
) as pool:
# Multiple operations using the pool
async with pool.acquire() as conn1:
async with conn1.cursor() as cur:
await cur.execute("INSERT INTO logs (message) VALUES ('Start')")
async with pool.acquire() as conn2:
async with conn2.cursor() as cur:
await cur.execute("INSERT INTO logs (message) VALUES ('Middle')")
async with pool.acquire() as conn3:
async with conn3.cursor() as cur:
await cur.execute("INSERT INTO logs (message) VALUES ('End')")
# Pool automatically closed when exiting context
print("Pool operations completed and pool closed")async def error_handling_example():
pool = await aiomysql.create_pool(
host='localhost',
user='myuser',
password='mypass',
db='mydatabase',
minsize=1,
maxsize=3
)
try:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
# This will cause an error
await cur.execute("SELECT * FROM nonexistent_table")
except aiomysql.ProgrammingError as e:
print(f"SQL error: {e}")
# Connection is still returned to pool even after error
except aiomysql.OperationalError as e:
print(f"Connection error: {e}")
# Pool will handle bad connections automatically
finally:
# Always clean up pool
pool.close()
await pool.wait_closed()Install with Tessl CLI
npx tessl i tessl/pypi-aiomysql