Python SQL toolkit and Object Relational Mapper providing full power and flexibility of SQL
Asynchronous database operations with async engines, connections, sessions, and ORM support for modern async Python applications. SQLAlchemy's async support enables non-blocking database operations in asyncio applications.
Create asynchronous database engines for async/await database operations.
async def create_async_engine(url, **kwargs):
"""
Create asynchronous database engine.
Parameters:
- url: str or URL, database connection URL with async driver
- echo: bool, log all SQL statements
- pool_size: int, connection pool size
- max_overflow: int, maximum pool overflow
- pool_timeout: int, connection timeout in seconds
- pool_recycle: int, connection recycle time
Returns:
AsyncEngine: Asynchronous database engine
"""
class AsyncEngine:
"""Asynchronous database engine with connection pooling."""
async def connect(self):
"""
Create new async database connection.
Returns:
AsyncConnection: New async database connection
"""
async def execute(self, statement, parameters=None):
"""
Execute statement with automatic connection management.
Parameters:
- statement: str or executable, SQL statement
- parameters: dict or sequence, bound parameters
Returns:
Result: Query results
"""
async def begin(self):
"""
Begin transaction with automatic connection management.
Returns:
AsyncTransaction: Async transaction context manager
"""
async def dispose(self):
"""Close all connections and dispose of connection pool."""
def sync_engine(self):
"""
Get synchronous engine for metadata operations.
Returns:
Engine: Synchronous engine for DDL operations
"""
@property
def dialect(self):
"""Database dialect for this engine."""Asynchronous connection handling with transaction support.
class AsyncConnection:
"""Asynchronous database connection with transaction support."""
async def execute(self, statement, parameters=None):
"""
Execute SQL statement on this async connection.
Parameters:
- statement: str or executable, SQL statement
- parameters: dict or sequence, bound parameters
Returns:
Result: Query results
"""
async def begin(self):
"""
Begin transaction on this connection.
Returns:
AsyncTransaction: Async transaction object
"""
async def commit(self):
"""Commit current transaction."""
async def rollback(self):
"""Rollback current transaction."""
async def close(self):
"""Close this async connection."""
async def scalar(self, statement, parameters=None):
"""
Execute statement and return scalar result.
Parameters:
- statement: str or executable, SQL statement
- parameters: dict or sequence, bound parameters
Returns:
Any: Single scalar value
"""
def get_transaction(self):
"""
Get current transaction for this connection.
Returns:
AsyncTransaction or None: Current transaction
"""Asynchronous transaction handling with context manager support.
class AsyncTransaction:
"""Asynchronous database transaction with rollback support."""
async def commit(self):
"""Commit this async transaction."""
async def rollback(self):
"""Rollback this async transaction."""
async def close(self):
"""Close transaction (rollback if not committed)."""
def is_active(self):
"""
Check if transaction is active.
Returns:
bool: True if transaction is active
"""Asynchronous ORM session with identity map and unit of work patterns.
class AsyncSession:
"""Asynchronous ORM session with identity map and unit of work."""
def __init__(self, bind, **kwargs):
"""
Create async ORM session.
Parameters:
- bind: AsyncEngine for database operations
- autoflush: bool, auto-flush before queries (default True)
- expire_on_commit: bool, expire objects after commit (default True)
"""
def add(self, instance):
"""
Add object instance to session.
Parameters:
- instance: mapped object to add
"""
def add_all(self, instances):
"""
Add multiple object instances to session.
Parameters:
- instances: iterable of mapped objects
"""
async def delete(self, instance):
"""
Mark object instance for deletion.
Parameters:
- instance: mapped object to delete
"""
async def commit(self):
"""Flush pending changes and commit transaction."""
async def rollback(self):
"""Rollback current transaction and expire all objects."""
async def flush(self):
"""Flush pending changes to database without committing."""
def expunge(self, instance):
"""
Remove instance from session without deleting.
Parameters:
- instance: mapped object to remove from session
"""
def expunge_all(self):
"""Remove all instances from session."""
async def refresh(self, instance, attribute_names=None):
"""
Refresh object from database.
Parameters:
- instance: mapped object to refresh
- attribute_names: specific attributes to refresh
"""
async def merge(self, instance):
"""
Merge detached instance into session.
Parameters:
- instance: detached mapped object
Returns:
object: Merged persistent instance
"""
async def execute(self, statement, parameters=None, **kwargs):
"""
Execute statement with ORM-level processing.
Parameters:
- statement: SQL statement or ORM query
- parameters: bind parameters
Returns:
Result: Query results
"""
async def scalar(self, statement, parameters=None, **kwargs):
"""
Execute statement and return scalar result.
Parameters:
- statement: SQL statement or ORM query
- parameters: bind parameters
Returns:
Any: Scalar result value
"""
async def get(self, entity, ident):
"""
Get object by primary key.
Parameters:
- entity: mapped class
- ident: primary key value or tuple
Returns:
object or None: Object instance or None if not found
"""
async def stream(self, statement):
"""
Execute statement and return async result stream.
Parameters:
- statement: SQL statement or ORM query
Returns:
AsyncResult: Streaming query results
"""
async def close(self):
"""Close the async session."""
def async_sessionmaker(bind=None, **kwargs):
"""
Create AsyncSession factory.
Parameters:
- bind: AsyncEngine for database operations
- kwargs: AsyncSession configuration options
Returns:
async_sessionmaker: AsyncSession factory class
"""
def async_scoped_session(session_factory):
"""
Create async scoped session with context-local storage.
Parameters:
- session_factory: async_sessionmaker instance
Returns:
async_scoped_session: Context-local async session proxy
"""Asynchronous result iteration and processing.
class AsyncResult:
"""Asynchronous query result with async iteration."""
async def fetchone(self):
"""
Fetch next row asynchronously.
Returns:
Row or None: Next row or None if no more rows
"""
async def fetchmany(self, size=None):
"""
Fetch multiple rows asynchronously.
Parameters:
- size: int, number of rows to fetch
Returns:
List[Row]: List of rows
"""
async def fetchall(self):
"""
Fetch all remaining rows asynchronously.
Returns:
List[Row]: All remaining rows
"""
async def scalar(self):
"""
Fetch scalar value from first column of first row.
Returns:
Any: Scalar value or None
"""
def mappings(self):
"""
Return result as async mapping-like objects.
Returns:
AsyncMappingResult: Result with dict-like row access
"""
def scalars(self, index=0):
"""
Return result as async scalar values.
Parameters:
- index: int, column index for scalar extraction
Returns:
AsyncScalarResult: Result with scalar value iteration
"""
def partitions(self, size=None):
"""
Partition result into chunks for async processing.
Parameters:
- size: int, partition size
Returns:
AsyncIterator: Async iterator of row partitions
"""
async def __aiter__(self):
"""Async iterator support for result rows."""
async def __anext__(self):
"""Get next row in async iteration."""
class AsyncScalarResult:
"""Async result optimized for scalar value iteration."""
async def all(self):
"""
Fetch all scalar values.
Returns:
List[Any]: All scalar values
"""
async def first(self):
"""
Fetch first scalar value.
Returns:
Any or None: First scalar value or None
"""
async def one(self):
"""
Fetch exactly one scalar value.
Returns:
Any: Single scalar value
Raises:
NoResultFound: If no results
MultipleResultsFound: If multiple results
"""
async def one_or_none(self):
"""
Fetch one scalar value or None.
Returns:
Any or None: Single scalar value or None
Raises:
MultipleResultsFound: If multiple results
"""Utility functions for async SQLAlchemy operations.
async def run_sync(fn, *args, **kwargs):
"""
Run synchronous function in async context.
Parameters:
- fn: synchronous function to run
- args: positional arguments
- kwargs: keyword arguments
Returns:
Any: Function result
"""
def greenlet_spawn(fn, *args, **kwargs):
"""
Spawn greenlet for sync operations in async context.
Parameters:
- fn: function to run in greenlet
- args: positional arguments
- kwargs: keyword arguments
Returns:
Any: Function result
"""import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped
from sqlalchemy import String, Integer, select
# Define models
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = 'users'
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50))
email: Mapped[str] = mapped_column(String(100))
async def main():
# Create async engine (note: driver must support async)
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/dbname",
echo=True
)
# Create tables (using sync engine)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# Use async session
async with AsyncSession(engine) as session:
# Add new user
new_user = User(name="Alice", email="alice@example.com")
session.add(new_user)
await session.commit()
# Query users
stmt = select(User).where(User.name.like('%Alice%'))
result = await session.execute(stmt)
users = result.scalars().all()
for user in users:
print(f"User: {user.name}, Email: {user.email}")
await engine.dispose()
# Run async function
asyncio.run(main())async def database_operations():
engine = create_async_engine("sqlite+aiosqlite:///async_example.db")
# Direct connection usage
async with engine.connect() as conn:
# Execute raw SQL
result = await conn.execute(text("SELECT 1"))
value = result.scalar()
print(f"Result: {value}")
# Transaction management
async with conn.begin():
await conn.execute(
users.insert().values(name="Bob", email="bob@example.com")
)
# Automatically committed
await engine.dispose()async def stream_large_dataset():
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
async with AsyncSession(engine) as session:
# Stream results for memory efficiency
stmt = select(User).where(User.active == True)
stream = await session.stream(stmt)
async for user in stream.scalars():
print(f"Processing user: {user.name}")
# Process user without loading all into memory
await engine.dispose()from sqlalchemy.ext.asyncio import async_sessionmaker
# Create reusable session factory
async_session = async_sessionmaker(
create_async_engine("postgresql+asyncpg://user:pass@localhost/db"),
expire_on_commit=False
)
async def get_user_by_id(user_id: int):
async with async_session() as session:
return await session.get(User, user_id)
async def create_user(name: str, email: str):
async with async_session() as session:
user = User(name=name, email=email)
session.add(user)
await session.commit()
return userasync def mixed_operations():
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
# Some operations need sync engine (like metadata operations)
sync_engine = engine.sync_engine
# Create tables with sync engine
Base.metadata.create_all(sync_engine)
# Use async engine for data operations
async with AsyncSession(engine) as session:
stmt = select(User).limit(10)
result = await session.execute(stmt)
users = result.scalars().all()
for user in users:
print(f"User: {user.name}")
await engine.dispose()from sqlalchemy.exc import IntegrityError, NoResultFound
async def safe_user_operations():
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
async with AsyncSession(engine) as session:
try:
# Attempt to create user with duplicate email
user = User(name="Test", email="existing@example.com")
session.add(user)
await session.commit()
except IntegrityError:
await session.rollback()
print("User with this email already exists")
try:
# Attempt to get non-existent user
stmt = select(User).where(User.id == 99999)
result = await session.execute(stmt)
user = result.scalar_one() # Raises if not found
except NoResultFound:
print("User not found")
await engine.dispose()# Different async drivers for different databases:
# PostgreSQL with asyncpg
engine = create_async_engine("postgresql+asyncpg://user:pass@host/db")
# MySQL with aiomysql
engine = create_async_engine("mysql+aiomysql://user:pass@host/db")
# SQLite with aiosqlite
engine = create_async_engine("sqlite+aiosqlite:///path/to/database.db")
# Note: The underlying DBAPI driver must support async operations
# Standard drivers like psycopg2, PyMySQL, sqlite3 do NOT work with async enginesInstall with Tessl CLI
npx tessl i tessl/pypi-sqlalchemy