CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-sqlalchemy

Python SQL toolkit and Object Relational Mapper providing full power and flexibility of SQL

Overview
Eval results
Files

async.mddocs/

Async Support

Asynchronous database operations with async engines, connections, sessions, and ORM support for modern async Python applications. SQLAlchemy's async support enables non-blocking database operations in asyncio applications.

Capabilities

Async Engine Creation

Create asynchronous database engines for async/await database operations.

async def create_async_engine(url, **kwargs):
    """
    Create asynchronous database engine.
    
    Parameters:
    - url: str or URL, database connection URL with async driver
    - echo: bool, log all SQL statements
    - pool_size: int, connection pool size
    - max_overflow: int, maximum pool overflow
    - pool_timeout: int, connection timeout in seconds
    - pool_recycle: int, connection recycle time
    
    Returns:
    AsyncEngine: Asynchronous database engine
    """

class AsyncEngine:
    """Asynchronous database engine with connection pooling."""
    
    async def connect(self):
        """
        Create new async database connection.
        
        Returns:
        AsyncConnection: New async database connection
        """
    
    async def execute(self, statement, parameters=None):
        """
        Execute statement with automatic connection management.
        
        Parameters:
        - statement: str or executable, SQL statement
        - parameters: dict or sequence, bound parameters
        
        Returns:
        Result: Query results
        """
    
    async def begin(self):
        """
        Begin transaction with automatic connection management.
        
        Returns:
        AsyncTransaction: Async transaction context manager
        """
    
    async def dispose(self):
        """Close all connections and dispose of connection pool."""
    
    def sync_engine(self):
        """
        Get synchronous engine for metadata operations.
        
        Returns:
        Engine: Synchronous engine for DDL operations
        """
    
    @property
    def dialect(self):
        """Database dialect for this engine."""

Async Connection Management

Asynchronous connection handling with transaction support.

class AsyncConnection:
    """Asynchronous database connection with transaction support."""
    
    async def execute(self, statement, parameters=None):
        """
        Execute SQL statement on this async connection.
        
        Parameters:
        - statement: str or executable, SQL statement
        - parameters: dict or sequence, bound parameters
        
        Returns:
        Result: Query results
        """
    
    async def begin(self):
        """
        Begin transaction on this connection.
        
        Returns:
        AsyncTransaction: Async transaction object
        """
    
    async def commit(self):
        """Commit current transaction."""
    
    async def rollback(self):
        """Rollback current transaction."""
    
    async def close(self):
        """Close this async connection."""
    
    async def scalar(self, statement, parameters=None):
        """
        Execute statement and return scalar result.
        
        Parameters:
        - statement: str or executable, SQL statement
        - parameters: dict or sequence, bound parameters
        
        Returns:
        Any: Single scalar value
        """
    
    def get_transaction(self):
        """
        Get current transaction for this connection.
        
        Returns:
        AsyncTransaction or None: Current transaction
        """

Async Transaction Management

Asynchronous transaction handling with context manager support.

class AsyncTransaction:
    """Asynchronous database transaction with rollback support."""
    
    async def commit(self):
        """Commit this async transaction."""
    
    async def rollback(self):
        """Rollback this async transaction."""
    
    async def close(self):
        """Close transaction (rollback if not committed)."""
    
    def is_active(self):
        """
        Check if transaction is active.
        
        Returns:
        bool: True if transaction is active
        """

Async ORM Session

Asynchronous ORM session with identity map and unit of work patterns.

class AsyncSession:
    """Asynchronous ORM session with identity map and unit of work."""
    
    def __init__(self, bind, **kwargs):
        """
        Create async ORM session.
        
        Parameters:
        - bind: AsyncEngine for database operations
        - autoflush: bool, auto-flush before queries (default True)
        - expire_on_commit: bool, expire objects after commit (default True)
        """
    
    def add(self, instance):
        """
        Add object instance to session.
        
        Parameters:
        - instance: mapped object to add
        """
    
    def add_all(self, instances):
        """
        Add multiple object instances to session.
        
        Parameters:
        - instances: iterable of mapped objects
        """
    
    async def delete(self, instance):
        """
        Mark object instance for deletion.
        
        Parameters:
        - instance: mapped object to delete
        """
    
    async def commit(self):
        """Flush pending changes and commit transaction."""
    
    async def rollback(self):
        """Rollback current transaction and expire all objects."""
    
    async def flush(self):
        """Flush pending changes to database without committing."""
    
    def expunge(self, instance):
        """
        Remove instance from session without deleting.
        
        Parameters:
        - instance: mapped object to remove from session
        """
    
    def expunge_all(self):
        """Remove all instances from session."""
    
    async def refresh(self, instance, attribute_names=None):
        """
        Refresh object from database.
        
        Parameters:
        - instance: mapped object to refresh
        - attribute_names: specific attributes to refresh
        """
    
    async def merge(self, instance):
        """
        Merge detached instance into session.
        
        Parameters:
        - instance: detached mapped object
        
        Returns:
        object: Merged persistent instance
        """
    
    async def execute(self, statement, parameters=None, **kwargs):
        """
        Execute statement with ORM-level processing.
        
        Parameters:
        - statement: SQL statement or ORM query
        - parameters: bind parameters
        
        Returns:
        Result: Query results
        """
    
    async def scalar(self, statement, parameters=None, **kwargs):
        """
        Execute statement and return scalar result.
        
        Parameters:
        - statement: SQL statement or ORM query
        - parameters: bind parameters
        
        Returns:
        Any: Scalar result value
        """
    
    async def get(self, entity, ident):
        """
        Get object by primary key.
        
        Parameters:
        - entity: mapped class
        - ident: primary key value or tuple
        
        Returns:
        object or None: Object instance or None if not found
        """
    
    async def stream(self, statement):
        """
        Execute statement and return async result stream.
        
        Parameters:
        - statement: SQL statement or ORM query
        
        Returns:
        AsyncResult: Streaming query results
        """
    
    async def close(self):
        """Close the async session."""

def async_sessionmaker(bind=None, **kwargs):
    """
    Create AsyncSession factory.
    
    Parameters:
    - bind: AsyncEngine for database operations
    - kwargs: AsyncSession configuration options
    
    Returns:
    async_sessionmaker: AsyncSession factory class
    """

def async_scoped_session(session_factory):
    """
    Create async scoped session with context-local storage.
    
    Parameters:
    - session_factory: async_sessionmaker instance
    
    Returns:
    async_scoped_session: Context-local async session proxy
    """

Async Result Processing

Asynchronous result iteration and processing.

class AsyncResult:
    """Asynchronous query result with async iteration."""
    
    async def fetchone(self):
        """
        Fetch next row asynchronously.
        
        Returns:
        Row or None: Next row or None if no more rows
        """
    
    async def fetchmany(self, size=None):
        """
        Fetch multiple rows asynchronously.
        
        Parameters:
        - size: int, number of rows to fetch
        
        Returns:
        List[Row]: List of rows
        """
    
    async def fetchall(self):
        """
        Fetch all remaining rows asynchronously.
        
        Returns:
        List[Row]: All remaining rows
        """
    
    async def scalar(self):
        """
        Fetch scalar value from first column of first row.
        
        Returns:
        Any: Scalar value or None
        """
    
    def mappings(self):
        """
        Return result as async mapping-like objects.
        
        Returns:
        AsyncMappingResult: Result with dict-like row access
        """
    
    def scalars(self, index=0):
        """
        Return result as async scalar values.
        
        Parameters:
        - index: int, column index for scalar extraction
        
        Returns:
        AsyncScalarResult: Result with scalar value iteration
        """
    
    def partitions(self, size=None):
        """
        Partition result into chunks for async processing.
        
        Parameters:
        - size: int, partition size
        
        Returns:
        AsyncIterator: Async iterator of row partitions
        """
    
    async def __aiter__(self):
        """Async iterator support for result rows."""
    
    async def __anext__(self):
        """Get next row in async iteration."""

class AsyncScalarResult:
    """Async result optimized for scalar value iteration."""
    
    async def all(self):
        """
        Fetch all scalar values.
        
        Returns:
        List[Any]: All scalar values
        """
    
    async def first(self):
        """
        Fetch first scalar value.
        
        Returns:
        Any or None: First scalar value or None
        """
    
    async def one(self):
        """
        Fetch exactly one scalar value.
        
        Returns:
        Any: Single scalar value
        
        Raises:
        NoResultFound: If no results
        MultipleResultsFound: If multiple results
        """
    
    async def one_or_none(self):
        """
        Fetch one scalar value or None.
        
        Returns:
        Any or None: Single scalar value or None
        
        Raises:
        MultipleResultsFound: If multiple results
        """

Async Utilities and Helpers

Utility functions for async SQLAlchemy operations.

async def run_sync(fn, *args, **kwargs):
    """
    Run synchronous function in async context.
    
    Parameters:
    - fn: synchronous function to run
    - args: positional arguments
    - kwargs: keyword arguments
    
    Returns:
    Any: Function result
    """

def greenlet_spawn(fn, *args, **kwargs):
    """
    Spawn greenlet for sync operations in async context.
    
    Parameters:
    - fn: function to run in greenlet
    - args: positional arguments
    - kwargs: keyword arguments
    
    Returns:
    Any: Function result
    """

Usage Examples

Basic Async Engine Usage

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped
from sqlalchemy import String, Integer, select

# Define models
class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = 'users'
    
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50))
    email: Mapped[str] = mapped_column(String(100))

async def main():
    # Create async engine (note: driver must support async)
    engine = create_async_engine(
        "postgresql+asyncpg://user:pass@localhost/dbname",
        echo=True
    )
    
    # Create tables (using sync engine)
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    
    # Use async session
    async with AsyncSession(engine) as session:
        # Add new user
        new_user = User(name="Alice", email="alice@example.com")
        session.add(new_user)
        await session.commit()
        
        # Query users
        stmt = select(User).where(User.name.like('%Alice%'))
        result = await session.execute(stmt)
        users = result.scalars().all()
        
        for user in users:
            print(f"User: {user.name}, Email: {user.email}")
    
    await engine.dispose()

# Run async function
asyncio.run(main())

Async Connection Context Management

async def database_operations():
    engine = create_async_engine("sqlite+aiosqlite:///async_example.db")
    
    # Direct connection usage
    async with engine.connect() as conn:
        # Execute raw SQL
        result = await conn.execute(text("SELECT 1"))
        value = result.scalar()
        print(f"Result: {value}")
        
        # Transaction management
        async with conn.begin():
            await conn.execute(
                users.insert().values(name="Bob", email="bob@example.com")
            )
            # Automatically committed
    
    await engine.dispose()

Streaming Results

async def stream_large_dataset():
    engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
    
    async with AsyncSession(engine) as session:
        # Stream results for memory efficiency
        stmt = select(User).where(User.active == True)
        stream = await session.stream(stmt)
        
        async for user in stream.scalars():
            print(f"Processing user: {user.name}")
            # Process user without loading all into memory
    
    await engine.dispose()

Async Session Factory

from sqlalchemy.ext.asyncio import async_sessionmaker

# Create reusable session factory
async_session = async_sessionmaker(
    create_async_engine("postgresql+asyncpg://user:pass@localhost/db"),
    expire_on_commit=False
)

async def get_user_by_id(user_id: int):
    async with async_session() as session:
        return await session.get(User, user_id)

async def create_user(name: str, email: str):
    async with async_session() as session:
        user = User(name=name, email=email)
        session.add(user)
        await session.commit()
        return user

Mixing Sync and Async Operations

async def mixed_operations():
    engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
    
    # Some operations need sync engine (like metadata operations)
    sync_engine = engine.sync_engine
    
    # Create tables with sync engine
    Base.metadata.create_all(sync_engine)
    
    # Use async engine for data operations
    async with AsyncSession(engine) as session:
        stmt = select(User).limit(10)
        result = await session.execute(stmt)
        users = result.scalars().all()
        
        for user in users:
            print(f"User: {user.name}")
    
    await engine.dispose()

Error Handling in Async Context

from sqlalchemy.exc import IntegrityError, NoResultFound

async def safe_user_operations():
    engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
    
    async with AsyncSession(engine) as session:
        try:
            # Attempt to create user with duplicate email
            user = User(name="Test", email="existing@example.com")
            session.add(user)
            await session.commit()
        except IntegrityError:
            await session.rollback()
            print("User with this email already exists")
        
        try:
            # Attempt to get non-existent user
            stmt = select(User).where(User.id == 99999)
            result = await session.execute(stmt)
            user = result.scalar_one()  # Raises if not found
        except NoResultFound:
            print("User not found")
    
    await engine.dispose()

Async Database Driver Requirements

# Different async drivers for different databases:

# PostgreSQL with asyncpg
engine = create_async_engine("postgresql+asyncpg://user:pass@host/db")

# MySQL with aiomysql  
engine = create_async_engine("mysql+aiomysql://user:pass@host/db")

# SQLite with aiosqlite
engine = create_async_engine("sqlite+aiosqlite:///path/to/database.db")

# Note: The underlying DBAPI driver must support async operations
# Standard drivers like psycopg2, PyMySQL, sqlite3 do NOT work with async engines

Install with Tessl CLI

npx tessl i tessl/pypi-sqlalchemy

docs

async.md

core-engine.md

dialects.md

index.md

orm.md

schema.md

sql-expression.md

types.md

tile.json