CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ydb

Officially supported Python client for YDB distributed SQL database

Overview
Eval results
Files

async-operations.mddocs/

Async Operations

Full async/await interface providing asynchronous versions of all core functionality through the ydb.aio module.

Capabilities

Async Driver

Asynchronous database driver with context manager support and automatic connection management.

import ydb.aio as ydb_aio

class Driver:
    def __init__(
        self,
        endpoint: str,
        database: str,
        credentials: Credentials = None,
        **kwargs
    ):
        """
        Create asynchronous YDB driver.
        
        Args:
            endpoint (str): YDB cluster endpoint
            database (str): Database path
            credentials (Credentials, optional): Authentication credentials
            **kwargs: Additional driver configuration
        """

    async def __aenter__(self) -> 'Driver':
        """
        Enter async context manager.
        
        Returns:
            Driver: Initialized driver instance
        """

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """
        Exit async context manager and cleanup resources.
        """

    async def wait(self, fail_fast: bool = True, timeout: float = None) -> bool:
        """
        Wait for driver to be ready asynchronously.
        
        Args:
            fail_fast (bool): Fail immediately on first error
            timeout (float, optional): Maximum wait time in seconds
            
        Returns:
            bool: True if driver is ready, False on timeout
        """

    async def stop(self, timeout: float = None):
        """
        Stop the driver and cleanup resources.
        
        Args:
            timeout (float, optional): Shutdown timeout in seconds
        """

    @property
    def discovery_debug_details(self) -> str:
        """Get discovery debug information."""

    def scheme_client(self) -> 'SchemeClient':
        """
        Create async scheme client for schema operations.
        
        Returns:
            SchemeClient: Async scheme client instance
        """

    def table_client(self) -> 'TableClient':
        """
        Create async table client for table operations.
        
        Returns:
            TableClient: Async table client instance
        """

    def query_session_pool(self, **kwargs) -> 'QuerySessionPool':
        """
        Create async query session pool.
        
        Returns:
            QuerySessionPool: Async query session pool
        """

Async Session Pool

Asynchronous session pool with automatic session lifecycle management and retry capabilities.

class SessionPool:
    def __init__(
        self,
        driver: Driver,
        size: int = None,
        creation_timeout: float = None,
        **kwargs
    ):
        """
        Create asynchronous session pool.
        
        Args:
            driver (Driver): Async YDB driver instance
            size (int, optional): Maximum pool size
            creation_timeout (float, optional): Session creation timeout
        """

    async def __aenter__(self) -> 'SessionPool':
        """
        Enter async context manager.
        
        Returns:
            SessionPool: Initialized session pool
        """

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """
        Exit async context manager and stop pool.
        """

    async def acquire(self, timeout: float = None) -> 'Session':
        """
        Acquire session from pool asynchronously.
        
        Args:
            timeout (float, optional): Acquisition timeout
            
        Returns:
            Session: Available async session
        """

    async def release(self, session: 'Session'):
        """
        Release session back to pool.
        
        Args:
            session (Session): Session to release
        """

    async def retry_operation(
        self,
        callee: Callable[['Session'], Awaitable[Any]],
        retry_settings: RetrySettings = None,
        *args,
        **kwargs
    ) -> Any:
        """
        Execute async operation with automatic retry and session management.
        
        Args:
            callee (Callable): Async function to execute with session
            retry_settings (RetrySettings, optional): Custom retry configuration
            *args: Additional arguments for callee
            **kwargs: Additional keyword arguments for callee
            
        Returns:
            Any: Result of callee execution
        """

    async def stop(self, timeout: float = None):
        """
        Stop the session pool and close all sessions.
        
        Args:
            timeout (float, optional): Shutdown timeout
        """

    def checkout(self) -> 'AsyncSessionCheckout':
        """
        Create async session checkout context manager.
        
        Returns:
            AsyncSessionCheckout: Async session context manager
        """

class AsyncSessionCheckout:
    def __init__(self, pool: SessionPool):
        """
        Async context manager for session checkout.
        
        Args:
            pool (SessionPool): Parent session pool
        """

    async def __aenter__(self) -> 'Session':
        """
        Acquire session from pool.
        
        Returns:
            Session: Available session
        """

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """
        Release session back to pool.
        """

Async Sessions

Asynchronous database session for query execution and transaction management.

class Session:
    def __init__(self, driver: Driver):
        """
        Create asynchronous database session.
        
        Args:
            driver (Driver): Async YDB driver instance
        """

    async def __aenter__(self) -> 'Session':
        """
        Enter async context manager.
        
        Returns:
            Session: Initialized session
        """

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """
        Exit async context manager and close session.
        """

    async def create_table(
        self,
        path: str,
        table_description: TableDescription,
        settings: CreateTableSettings = None
    ):
        """
        Create table asynchronously.
        
        Args:
            path (str): Table path
            table_description (TableDescription): Table structure definition
            settings (CreateTableSettings, optional): Creation settings
        """

    async def drop_table(self, path: str, settings: DropTableSettings = None):
        """
        Drop table asynchronously.
        
        Args:
            path (str): Table path
            settings (DropTableSettings, optional): Drop settings
        """

    async def alter_table(
        self,
        path: str,
        alter_table_settings: AlterTableSettings
    ):
        """
        Alter table structure asynchronously.
        
        Args:
            path (str): Table path
            alter_table_settings (AlterTableSettings): Alteration settings
        """

    async def copy_table(
        self,
        source_path: str,
        destination_path: str,
        settings: CopyTableSettings = None
    ):
        """
        Copy table asynchronously.
        
        Args:
            source_path (str): Source table path
            destination_path (str): Destination table path
            settings (CopyTableSettings, optional): Copy settings
        """

    async def describe_table(
        self,
        path: str,
        settings: DescribeTableSettings = None
    ) -> TableDescription:
        """
        Describe table structure asynchronously.
        
        Args:
            path (str): Table path
            settings (DescribeTableSettings, optional): Describe settings
            
        Returns:
            TableDescription: Table structure information
        """

    async def execute_query(
        self,
        query: str,
        parameters: Dict[str, Any] = None,
        settings: ExecuteQuerySettings = None
    ) -> List[ResultSet]:
        """
        Execute YQL query asynchronously.
        
        Args:
            query (str): YQL query text
            parameters (Dict[str, Any], optional): Query parameters
            settings (ExecuteQuerySettings, optional): Execution settings
            
        Returns:
            List[ResultSet]: Query results
        """

    async def execute_scheme_query(self, query: str):
        """
        Execute scheme query asynchronously.
        
        Args:
            query (str): Scheme query text (DDL)
        """

    async def prepare_query(
        self,
        query: str,
        settings: PrepareQuerySettings = None
    ) -> DataQuery:
        """
        Prepare query for execution asynchronously.
        
        Args:
            query (str): YQL query text
            settings (PrepareQuerySettings, optional): Preparation settings
            
        Returns:
            DataQuery: Prepared query object
        """

    async def transaction(self, tx_mode: TxMode = None) -> 'AsyncTxContext':
        """
        Begin transaction asynchronously.
        
        Args:
            tx_mode (TxMode, optional): Transaction mode
            
        Returns:
            AsyncTxContext: Async transaction context
        """

    async def read_table(
        self,
        path: str,
        key_range: KeyRange = None,
        columns: List[str] = None,
        settings: ReadTableSettings = None
    ) -> AsyncIterator[ResultSet]:
        """
        Read table data asynchronously with streaming.
        
        Args:
            path (str): Table path
            key_range (KeyRange, optional): Key range to read
            columns (List[str], optional): Columns to read
            settings (ReadTableSettings, optional): Read settings
            
        Returns:
            AsyncIterator[ResultSet]: Streaming result sets
        """

    async def bulk_upsert(
        self,
        path: str,
        rows: Union[List[Dict], pd.DataFrame],
        column_types: Dict[str, Type] = None,
        settings: BulkUpsertSettings = None
    ):
        """
        Bulk upsert data asynchronously.
        
        Args:
            path (str): Table path
            rows (Union[List[Dict], pd.DataFrame]): Data to upsert
            column_types (Dict[str, Type], optional): Column type overrides
            settings (BulkUpsertSettings, optional): Upsert settings
        """

    async def close(self):
        """
        Close session and release resources asynchronously.
        """

    @property
    def session_id(self) -> str:
        """Get session identifier."""

Async Transaction Context

Asynchronous transaction management with automatic commit/rollback handling.

class AsyncTxContext:
    def __init__(self, session: Session, tx_mode: TxMode = None):
        """
        Asynchronous transaction context.
        
        Args:
            session (Session): Parent async session
            tx_mode (TxMode, optional): Transaction isolation mode
        """

    async def __aenter__(self) -> 'AsyncTxContext':
        """
        Enter async transaction context.
        
        Returns:
            AsyncTxContext: Transaction context
        """

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """
        Exit async transaction context with automatic commit/rollback.
        """

    async def execute(
        self,
        query: str,
        parameters: Dict[str, Any] = None,
        commit_tx: bool = False,
        settings: ExecuteQuerySettings = None
    ) -> List[ResultSet]:
        """
        Execute query within transaction asynchronously.
        
        Args:
            query (str): YQL query text
            parameters (Dict[str, Any], optional): Query parameters
            commit_tx (bool): Commit transaction after execution
            settings (ExecuteQuerySettings, optional): Execution settings
            
        Returns:
            List[ResultSet]: Query results
        """

    async def commit(self, settings: CommitTxSettings = None):
        """
        Commit transaction asynchronously.
        
        Args:
            settings (CommitTxSettings, optional): Commit settings
        """

    async def rollback(self, settings: RollbackTxSettings = None):
        """
        Rollback transaction asynchronously.
        
        Args:
            settings (RollbackTxSettings, optional): Rollback settings
        """

    @property
    def tx_id(self) -> str:
        """Get transaction identifier."""

Async Query Service

Asynchronous query service with modern YQL interface and session pooling.

class QuerySessionPool:
    def __init__(
        self,
        driver: Driver,
        size: int = None,
        query_client_settings: QueryClientSettings = None
    ):
        """
        Asynchronous query session pool.
        
        Args:
            driver (Driver): Async YDB driver instance
            size (int, optional): Maximum pool size
            query_client_settings (QueryClientSettings, optional): Default settings
        """

    async def __aenter__(self) -> 'QuerySessionPool':
        """Enter async context manager."""

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Exit async context manager."""

    async def acquire(self, timeout: float = None) -> 'QuerySession':
        """
        Acquire query session from pool asynchronously.
        
        Args:
            timeout (float, optional): Acquisition timeout
            
        Returns:
            QuerySession: Available async query session
        """

    async def release(self, session: 'QuerySession'):
        """
        Release query session back to pool.
        
        Args:
            session (QuerySession): Session to release
        """

    async def retry_operation(
        self,
        callee: Callable[['QuerySession'], Awaitable[Any]],
        retry_settings: RetrySettings = None,
        *args,
        **kwargs
    ) -> Any:
        """
        Execute async operation with automatic retry.
        
        Args:
            callee (Callable): Async function to execute
            retry_settings (RetrySettings, optional): Retry configuration
            
        Returns:
            Any: Result of callee execution
        """

    async def stop(self, timeout: float = None):
        """Stop the query session pool."""

class QuerySession:
    def __init__(self, driver: Driver, settings: QueryClientSettings = None):
        """
        Asynchronous query session.
        
        Args:
            driver (Driver): Async YDB driver instance
            settings (QueryClientSettings, optional): Session configuration
        """

    async def execute_query(
        self,
        query: str,
        parameters: Dict[str, Any] = None,
        tx_control: QueryTxControl = None,
        settings: ExecuteQuerySettings = None
    ) -> AsyncIterator[ResultSet]:
        """
        Execute query asynchronously with streaming results.
        
        Args:
            query (str): YQL query text
            parameters (Dict[str, Any], optional): Query parameters
            tx_control (QueryTxControl, optional): Transaction control
            settings (ExecuteQuerySettings, optional): Execution settings
            
        Returns:
            AsyncIterator[ResultSet]: Streaming query results
        """

    async def transaction(
        self,
        tx_settings: QueryTxSettings = None
    ) -> 'AsyncQueryTxContext':
        """
        Begin async query transaction.
        
        Args:
            tx_settings (QueryTxSettings, optional): Transaction settings
            
        Returns:
            AsyncQueryTxContext: Async transaction context
        """

    async def close(self):
        """Close async query session."""

class AsyncQueryTxContext:
    async def __aenter__(self) -> 'AsyncQueryTxContext':
        """Enter async transaction context."""

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Exit async transaction context."""

    async def execute(
        self,
        query: str,
        parameters: Dict[str, Any] = None,
        settings: ExecuteQuerySettings = None
    ) -> AsyncIterator[ResultSet]:
        """Execute query within async transaction."""

    async def commit(self):
        """Commit async transaction."""

    async def rollback(self):
        """Rollback async transaction."""

Async Scheme Operations

Asynchronous schema and directory operations client.

class SchemeClient:
    def __init__(self, driver: Driver):
        """
        Asynchronous scheme client for schema operations.
        
        Args:
            driver (Driver): Async YDB driver instance
        """

    async def make_directory(
        self,
        path: str,
        settings: MakeDirectorySettings = None
    ):
        """
        Create directory asynchronously.
        
        Args:
            path (str): Directory path
            settings (MakeDirectorySettings, optional): Creation settings
        """

    async def remove_directory(
        self,
        path: str,
        settings: RemoveDirectorySettings = None
    ):
        """
        Remove directory asynchronously.
        
        Args:
            path (str): Directory path
            settings (RemoveDirectorySettings, optional): Removal settings
        """

    async def list_directory(
        self,
        path: str,
        settings: ListDirectorySettings = None
    ) -> Directory:
        """
        List directory contents asynchronously.
        
        Args:
            path (str): Directory path
            settings (ListDirectorySettings, optional): Listing settings
            
        Returns:
            Directory: Directory information with entries
        """

    async def describe_path(
        self,
        path: str,
        settings: DescribePathSettings = None
    ) -> SchemeEntry:
        """
        Describe path entry asynchronously.
        
        Args:
            path (str): Entry path
            settings (DescribePathSettings, optional): Describe settings
            
        Returns:
            SchemeEntry: Path entry information
        """

    async def modify_permissions(
        self,
        path: str,
        permissions: Permissions,
        settings: ModifyPermissionsSettings = None
    ):
        """
        Modify path permissions asynchronously.
        
        Args:
            path (str): Entry path
            permissions (Permissions): Permission changes
            settings (ModifyPermissionsSettings, optional): Modify settings
        """

Async Retry Operations

Asynchronous retry functionality with backoff and error handling.

async def retry_operation(
    callee: Callable[..., Awaitable[Any]],
    retry_settings: RetrySettings = None,
    session_pool: SessionPool = None,
    *args,
    **kwargs
) -> Any:
    """
    Execute async operation with retry logic.
    
    Args:
        callee (Callable): Async function to execute
        retry_settings (RetrySettings, optional): Retry configuration
        session_pool (SessionPool, optional): Session pool for session-based operations
        *args: Additional arguments for callee
        **kwargs: Additional keyword arguments for callee
        
    Returns:
        Any: Result of successful callee execution
    """

class AsyncRetrySettings:
    def __init__(
        self,
        max_retries: int = 10,
        max_session_acquire_timeout: float = None,
        fast_backoff_settings: BackoffSettings = None,
        slow_backoff_settings: BackoffSettings = None,
        **kwargs
    ):
        """
        Retry settings for async operations.
        
        Args:
            max_retries (int): Maximum number of retry attempts
            max_session_acquire_timeout (float, optional): Session acquisition timeout
            fast_backoff_settings (BackoffSettings, optional): Fast backoff configuration
            slow_backoff_settings (BackoffSettings, optional): Slow backoff configuration
        """

Usage Examples

Basic Async Driver Usage

import asyncio
import ydb.aio as ydb_aio

async def main():
    # Create async driver with context manager
    async with ydb_aio.Driver(
        endpoint="grpc://localhost:2136",
        database="/local",
        credentials=ydb.AnonymousCredentials()
    ) as driver:
        # Wait for driver to be ready
        await driver.wait(fail_fast=True, timeout=5)
        
        # Create session pool
        async with ydb_aio.SessionPool(driver) as pool:
            # Execute operation
            async def query_operation(session):
                result_sets = await session.execute_query("SELECT 1 AS value")
                return [row.value for row in result_sets[0].rows]
            
            results = await pool.retry_operation(query_operation)
            print(f"Results: {results}")

# Run async main function
asyncio.run(main())

Async Transaction Management

async def transfer_funds(session, from_account, to_account, amount):
    # Execute multiple queries in async transaction
    async with await session.transaction(ydb.SerializableReadWrite()) as tx:
        # Debit from source account
        await tx.execute(
            """
            UPDATE accounts 
            SET balance = balance - $amount 
            WHERE account_id = $from_account
            """,
            parameters={
                "$from_account": from_account,
                "$amount": amount
            }
        )
        
        # Credit to destination account
        await tx.execute(
            """
            UPDATE accounts 
            SET balance = balance + $amount 
            WHERE account_id = $to_account
            """,
            parameters={
                "$to_account": to_account,
                "$amount": amount
            }
        )
        # Transaction automatically commits on context exit

# Usage with session pool
async with pool.checkout() as session:
    await transfer_funds(session, "acc1", "acc2", 100.0)

Async Query Service

async def execute_analytics_query():
    async with ydb_aio.Driver(...) as driver:
        query_pool = driver.query_session_pool(size=5)
        
        async with query_pool as pool:
            async def analytics_operation(session):
                query = """
                    SELECT 
                        DATE_TRUNC('month', created_at) as month,
                        COUNT(*) as orders_count,
                        SUM(total_amount) as total_revenue
                    FROM orders
                    WHERE created_at >= $start_date
                    GROUP BY month
                    ORDER BY month
                """
                
                parameters = {"$start_date": datetime(2024, 1, 1)}
                
                async for result_set in session.execute_query(query, parameters):
                    async for row in result_set.rows:
                        print(f"Month: {row.month}, Orders: {row.orders_count}, Revenue: {row.total_revenue}")
            
            await pool.retry_operation(analytics_operation)

Async Bulk Operations

async def bulk_insert_users(session, user_data):
    # Prepare bulk data
    rows = [
        {"user_id": user["id"], "name": user["name"], "email": user["email"]}
        for user in user_data
    ]
    
    # Define column types
    column_types = {
        "user_id": ydb.PrimitiveType.Uint64,
        "name": ydb.PrimitiveType.Utf8,
        "email": ydb.PrimitiveType.Utf8
    }
    
    # Perform bulk upsert asynchronously
    await session.bulk_upsert(
        "/local/users",
        rows,
        column_types=column_types
    )

# Usage
async with pool.checkout() as session:
    await bulk_insert_users(session, user_data_list)

Async Schema Operations

async def setup_database_schema():
    async with ydb_aio.Driver(...) as driver:
        scheme_client = driver.scheme_client()
        
        # Create directories
        await scheme_client.make_directory("/local/app")
        await scheme_client.make_directory("/local/app/tables")
        
        # List directory contents
        directory = await scheme_client.list_directory("/local/app")
        for entry in directory.children:
            print(f"Entry: {entry.name}, Type: {entry.type}")
        
        # Create table through session
        async with ydb_aio.SessionPool(driver) as pool:
            async def create_table_operation(session):
                table_description = (
                    ydb.TableDescription()
                    .with_column(ydb.TableColumn("id", ydb.OptionalType(ydb.PrimitiveType.Uint64)))
                    .with_column(ydb.TableColumn("name", ydb.OptionalType(ydb.PrimitiveType.Utf8)))
                    .with_primary_key("id")
                )
                
                await session.create_table("/local/app/tables/users", table_description)
            
            await pool.retry_operation(create_table_operation)

Type Definitions

# Type aliases for async operations
AsyncQueryCallback = Callable[['Session'], Awaitable[Any]]
AsyncQuerySessionCallback = Callable[['QuerySession'], Awaitable[Any]]
AsyncResultIterator = AsyncIterator[ResultSet]

# Common async context managers
AsyncDriverContext = AsyncContextManager[Driver]
AsyncSessionPoolContext = AsyncContextManager[SessionPool]
AsyncSessionContext = AsyncContextManager[Session]
AsyncTxContextManager = AsyncContextManager[AsyncTxContext]

Install with Tessl CLI

npx tessl i tessl/pypi-ydb

docs

async-operations.md

authentication.md

data-types.md

dbapi-interface.md

driver-connection.md

error-handling.md

index.md

query-service.md

schema-operations.md

sqlalchemy-integration.md

table-operations.md

topic-operations.md

tile.json