Officially supported Python client for YDB distributed SQL database
Full async/await interface providing asynchronous versions of all core functionality through the ydb.aio module.
Asynchronous database driver with context manager support and automatic connection management.
import ydb.aio as ydb_aio
class Driver:
def __init__(
self,
endpoint: str,
database: str,
credentials: Credentials = None,
**kwargs
):
"""
Create asynchronous YDB driver.
Args:
endpoint (str): YDB cluster endpoint
database (str): Database path
credentials (Credentials, optional): Authentication credentials
**kwargs: Additional driver configuration
"""
async def __aenter__(self) -> 'Driver':
"""
Enter async context manager.
Returns:
Driver: Initialized driver instance
"""
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""
Exit async context manager and cleanup resources.
"""
async def wait(self, fail_fast: bool = True, timeout: float = None) -> bool:
"""
Wait for driver to be ready asynchronously.
Args:
fail_fast (bool): Fail immediately on first error
timeout (float, optional): Maximum wait time in seconds
Returns:
bool: True if driver is ready, False on timeout
"""
async def stop(self, timeout: float = None):
"""
Stop the driver and cleanup resources.
Args:
timeout (float, optional): Shutdown timeout in seconds
"""
@property
def discovery_debug_details(self) -> str:
"""Get discovery debug information."""
def scheme_client(self) -> 'SchemeClient':
"""
Create async scheme client for schema operations.
Returns:
SchemeClient: Async scheme client instance
"""
def table_client(self) -> 'TableClient':
"""
Create async table client for table operations.
Returns:
TableClient: Async table client instance
"""
def query_session_pool(self, **kwargs) -> 'QuerySessionPool':
"""
Create async query session pool.
Returns:
QuerySessionPool: Async query session pool
"""Asynchronous session pool with automatic session lifecycle management and retry capabilities.
class SessionPool:
def __init__(
self,
driver: Driver,
size: int = None,
creation_timeout: float = None,
**kwargs
):
"""
Create asynchronous session pool.
Args:
driver (Driver): Async YDB driver instance
size (int, optional): Maximum pool size
creation_timeout (float, optional): Session creation timeout
"""
async def __aenter__(self) -> 'SessionPool':
"""
Enter async context manager.
Returns:
SessionPool: Initialized session pool
"""
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""
Exit async context manager and stop pool.
"""
async def acquire(self, timeout: float = None) -> 'Session':
"""
Acquire session from pool asynchronously.
Args:
timeout (float, optional): Acquisition timeout
Returns:
Session: Available async session
"""
async def release(self, session: 'Session'):
"""
Release session back to pool.
Args:
session (Session): Session to release
"""
async def retry_operation(
self,
callee: Callable[['Session'], Awaitable[Any]],
retry_settings: RetrySettings = None,
*args,
**kwargs
) -> Any:
"""
Execute async operation with automatic retry and session management.
Args:
callee (Callable): Async function to execute with session
retry_settings (RetrySettings, optional): Custom retry configuration
*args: Additional arguments for callee
**kwargs: Additional keyword arguments for callee
Returns:
Any: Result of callee execution
"""
async def stop(self, timeout: float = None):
"""
Stop the session pool and close all sessions.
Args:
timeout (float, optional): Shutdown timeout
"""
def checkout(self) -> 'AsyncSessionCheckout':
"""
Create async session checkout context manager.
Returns:
AsyncSessionCheckout: Async session context manager
"""
class AsyncSessionCheckout:
def __init__(self, pool: SessionPool):
"""
Async context manager for session checkout.
Args:
pool (SessionPool): Parent session pool
"""
async def __aenter__(self) -> 'Session':
"""
Acquire session from pool.
Returns:
Session: Available session
"""
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""
Release session back to pool.
"""Asynchronous database session for query execution and transaction management.
class Session:
def __init__(self, driver: Driver):
"""
Create asynchronous database session.
Args:
driver (Driver): Async YDB driver instance
"""
async def __aenter__(self) -> 'Session':
"""
Enter async context manager.
Returns:
Session: Initialized session
"""
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""
Exit async context manager and close session.
"""
async def create_table(
self,
path: str,
table_description: TableDescription,
settings: CreateTableSettings = None
):
"""
Create table asynchronously.
Args:
path (str): Table path
table_description (TableDescription): Table structure definition
settings (CreateTableSettings, optional): Creation settings
"""
async def drop_table(self, path: str, settings: DropTableSettings = None):
"""
Drop table asynchronously.
Args:
path (str): Table path
settings (DropTableSettings, optional): Drop settings
"""
async def alter_table(
self,
path: str,
alter_table_settings: AlterTableSettings
):
"""
Alter table structure asynchronously.
Args:
path (str): Table path
alter_table_settings (AlterTableSettings): Alteration settings
"""
async def copy_table(
self,
source_path: str,
destination_path: str,
settings: CopyTableSettings = None
):
"""
Copy table asynchronously.
Args:
source_path (str): Source table path
destination_path (str): Destination table path
settings (CopyTableSettings, optional): Copy settings
"""
async def describe_table(
self,
path: str,
settings: DescribeTableSettings = None
) -> TableDescription:
"""
Describe table structure asynchronously.
Args:
path (str): Table path
settings (DescribeTableSettings, optional): Describe settings
Returns:
TableDescription: Table structure information
"""
async def execute_query(
self,
query: str,
parameters: Dict[str, Any] = None,
settings: ExecuteQuerySettings = None
) -> List[ResultSet]:
"""
Execute YQL query asynchronously.
Args:
query (str): YQL query text
parameters (Dict[str, Any], optional): Query parameters
settings (ExecuteQuerySettings, optional): Execution settings
Returns:
List[ResultSet]: Query results
"""
async def execute_scheme_query(self, query: str):
"""
Execute scheme query asynchronously.
Args:
query (str): Scheme query text (DDL)
"""
async def prepare_query(
self,
query: str,
settings: PrepareQuerySettings = None
) -> DataQuery:
"""
Prepare query for execution asynchronously.
Args:
query (str): YQL query text
settings (PrepareQuerySettings, optional): Preparation settings
Returns:
DataQuery: Prepared query object
"""
async def transaction(self, tx_mode: TxMode = None) -> 'AsyncTxContext':
"""
Begin transaction asynchronously.
Args:
tx_mode (TxMode, optional): Transaction mode
Returns:
AsyncTxContext: Async transaction context
"""
async def read_table(
self,
path: str,
key_range: KeyRange = None,
columns: List[str] = None,
settings: ReadTableSettings = None
) -> AsyncIterator[ResultSet]:
"""
Read table data asynchronously with streaming.
Args:
path (str): Table path
key_range (KeyRange, optional): Key range to read
columns (List[str], optional): Columns to read
settings (ReadTableSettings, optional): Read settings
Returns:
AsyncIterator[ResultSet]: Streaming result sets
"""
async def bulk_upsert(
self,
path: str,
rows: Union[List[Dict], pd.DataFrame],
column_types: Dict[str, Type] = None,
settings: BulkUpsertSettings = None
):
"""
Bulk upsert data asynchronously.
Args:
path (str): Table path
rows (Union[List[Dict], pd.DataFrame]): Data to upsert
column_types (Dict[str, Type], optional): Column type overrides
settings (BulkUpsertSettings, optional): Upsert settings
"""
async def close(self):
"""
Close session and release resources asynchronously.
"""
@property
def session_id(self) -> str:
"""Get session identifier."""Asynchronous transaction management with automatic commit/rollback handling.
class AsyncTxContext:
def __init__(self, session: Session, tx_mode: TxMode = None):
"""
Asynchronous transaction context.
Args:
session (Session): Parent async session
tx_mode (TxMode, optional): Transaction isolation mode
"""
async def __aenter__(self) -> 'AsyncTxContext':
"""
Enter async transaction context.
Returns:
AsyncTxContext: Transaction context
"""
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""
Exit async transaction context with automatic commit/rollback.
"""
async def execute(
self,
query: str,
parameters: Dict[str, Any] = None,
commit_tx: bool = False,
settings: ExecuteQuerySettings = None
) -> List[ResultSet]:
"""
Execute query within transaction asynchronously.
Args:
query (str): YQL query text
parameters (Dict[str, Any], optional): Query parameters
commit_tx (bool): Commit transaction after execution
settings (ExecuteQuerySettings, optional): Execution settings
Returns:
List[ResultSet]: Query results
"""
async def commit(self, settings: CommitTxSettings = None):
"""
Commit transaction asynchronously.
Args:
settings (CommitTxSettings, optional): Commit settings
"""
async def rollback(self, settings: RollbackTxSettings = None):
"""
Rollback transaction asynchronously.
Args:
settings (RollbackTxSettings, optional): Rollback settings
"""
@property
def tx_id(self) -> str:
"""Get transaction identifier."""Asynchronous query service with modern YQL interface and session pooling.
class QuerySessionPool:
def __init__(
self,
driver: Driver,
size: int = None,
query_client_settings: QueryClientSettings = None
):
"""
Asynchronous query session pool.
Args:
driver (Driver): Async YDB driver instance
size (int, optional): Maximum pool size
query_client_settings (QueryClientSettings, optional): Default settings
"""
async def __aenter__(self) -> 'QuerySessionPool':
"""Enter async context manager."""
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit async context manager."""
async def acquire(self, timeout: float = None) -> 'QuerySession':
"""
Acquire query session from pool asynchronously.
Args:
timeout (float, optional): Acquisition timeout
Returns:
QuerySession: Available async query session
"""
async def release(self, session: 'QuerySession'):
"""
Release query session back to pool.
Args:
session (QuerySession): Session to release
"""
async def retry_operation(
self,
callee: Callable[['QuerySession'], Awaitable[Any]],
retry_settings: RetrySettings = None,
*args,
**kwargs
) -> Any:
"""
Execute async operation with automatic retry.
Args:
callee (Callable): Async function to execute
retry_settings (RetrySettings, optional): Retry configuration
Returns:
Any: Result of callee execution
"""
async def stop(self, timeout: float = None):
"""Stop the query session pool."""
class QuerySession:
def __init__(self, driver: Driver, settings: QueryClientSettings = None):
"""
Asynchronous query session.
Args:
driver (Driver): Async YDB driver instance
settings (QueryClientSettings, optional): Session configuration
"""
async def execute_query(
self,
query: str,
parameters: Dict[str, Any] = None,
tx_control: QueryTxControl = None,
settings: ExecuteQuerySettings = None
) -> AsyncIterator[ResultSet]:
"""
Execute query asynchronously with streaming results.
Args:
query (str): YQL query text
parameters (Dict[str, Any], optional): Query parameters
tx_control (QueryTxControl, optional): Transaction control
settings (ExecuteQuerySettings, optional): Execution settings
Returns:
AsyncIterator[ResultSet]: Streaming query results
"""
async def transaction(
self,
tx_settings: QueryTxSettings = None
) -> 'AsyncQueryTxContext':
"""
Begin async query transaction.
Args:
tx_settings (QueryTxSettings, optional): Transaction settings
Returns:
AsyncQueryTxContext: Async transaction context
"""
async def close(self):
"""Close async query session."""
class AsyncQueryTxContext:
async def __aenter__(self) -> 'AsyncQueryTxContext':
"""Enter async transaction context."""
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit async transaction context."""
async def execute(
self,
query: str,
parameters: Dict[str, Any] = None,
settings: ExecuteQuerySettings = None
) -> AsyncIterator[ResultSet]:
"""Execute query within async transaction."""
async def commit(self):
"""Commit async transaction."""
async def rollback(self):
"""Rollback async transaction."""Asynchronous schema and directory operations client.
class SchemeClient:
def __init__(self, driver: Driver):
"""
Asynchronous scheme client for schema operations.
Args:
driver (Driver): Async YDB driver instance
"""
async def make_directory(
self,
path: str,
settings: MakeDirectorySettings = None
):
"""
Create directory asynchronously.
Args:
path (str): Directory path
settings (MakeDirectorySettings, optional): Creation settings
"""
async def remove_directory(
self,
path: str,
settings: RemoveDirectorySettings = None
):
"""
Remove directory asynchronously.
Args:
path (str): Directory path
settings (RemoveDirectorySettings, optional): Removal settings
"""
async def list_directory(
self,
path: str,
settings: ListDirectorySettings = None
) -> Directory:
"""
List directory contents asynchronously.
Args:
path (str): Directory path
settings (ListDirectorySettings, optional): Listing settings
Returns:
Directory: Directory information with entries
"""
async def describe_path(
self,
path: str,
settings: DescribePathSettings = None
) -> SchemeEntry:
"""
Describe path entry asynchronously.
Args:
path (str): Entry path
settings (DescribePathSettings, optional): Describe settings
Returns:
SchemeEntry: Path entry information
"""
async def modify_permissions(
self,
path: str,
permissions: Permissions,
settings: ModifyPermissionsSettings = None
):
"""
Modify path permissions asynchronously.
Args:
path (str): Entry path
permissions (Permissions): Permission changes
settings (ModifyPermissionsSettings, optional): Modify settings
"""Asynchronous retry functionality with backoff and error handling.
async def retry_operation(
callee: Callable[..., Awaitable[Any]],
retry_settings: RetrySettings = None,
session_pool: SessionPool = None,
*args,
**kwargs
) -> Any:
"""
Execute async operation with retry logic.
Args:
callee (Callable): Async function to execute
retry_settings (RetrySettings, optional): Retry configuration
session_pool (SessionPool, optional): Session pool for session-based operations
*args: Additional arguments for callee
**kwargs: Additional keyword arguments for callee
Returns:
Any: Result of successful callee execution
"""
class AsyncRetrySettings:
def __init__(
self,
max_retries: int = 10,
max_session_acquire_timeout: float = None,
fast_backoff_settings: BackoffSettings = None,
slow_backoff_settings: BackoffSettings = None,
**kwargs
):
"""
Retry settings for async operations.
Args:
max_retries (int): Maximum number of retry attempts
max_session_acquire_timeout (float, optional): Session acquisition timeout
fast_backoff_settings (BackoffSettings, optional): Fast backoff configuration
slow_backoff_settings (BackoffSettings, optional): Slow backoff configuration
"""import asyncio
import ydb.aio as ydb_aio
async def main():
# Create async driver with context manager
async with ydb_aio.Driver(
endpoint="grpc://localhost:2136",
database="/local",
credentials=ydb.AnonymousCredentials()
) as driver:
# Wait for driver to be ready
await driver.wait(fail_fast=True, timeout=5)
# Create session pool
async with ydb_aio.SessionPool(driver) as pool:
# Execute operation
async def query_operation(session):
result_sets = await session.execute_query("SELECT 1 AS value")
return [row.value for row in result_sets[0].rows]
results = await pool.retry_operation(query_operation)
print(f"Results: {results}")
# Run async main function
asyncio.run(main())async def transfer_funds(session, from_account, to_account, amount):
# Execute multiple queries in async transaction
async with await session.transaction(ydb.SerializableReadWrite()) as tx:
# Debit from source account
await tx.execute(
"""
UPDATE accounts
SET balance = balance - $amount
WHERE account_id = $from_account
""",
parameters={
"$from_account": from_account,
"$amount": amount
}
)
# Credit to destination account
await tx.execute(
"""
UPDATE accounts
SET balance = balance + $amount
WHERE account_id = $to_account
""",
parameters={
"$to_account": to_account,
"$amount": amount
}
)
# Transaction automatically commits on context exit
# Usage with session pool
async with pool.checkout() as session:
await transfer_funds(session, "acc1", "acc2", 100.0)async def execute_analytics_query():
async with ydb_aio.Driver(...) as driver:
query_pool = driver.query_session_pool(size=5)
async with query_pool as pool:
async def analytics_operation(session):
query = """
SELECT
DATE_TRUNC('month', created_at) as month,
COUNT(*) as orders_count,
SUM(total_amount) as total_revenue
FROM orders
WHERE created_at >= $start_date
GROUP BY month
ORDER BY month
"""
parameters = {"$start_date": datetime(2024, 1, 1)}
async for result_set in session.execute_query(query, parameters):
async for row in result_set.rows:
print(f"Month: {row.month}, Orders: {row.orders_count}, Revenue: {row.total_revenue}")
await pool.retry_operation(analytics_operation)async def bulk_insert_users(session, user_data):
# Prepare bulk data
rows = [
{"user_id": user["id"], "name": user["name"], "email": user["email"]}
for user in user_data
]
# Define column types
column_types = {
"user_id": ydb.PrimitiveType.Uint64,
"name": ydb.PrimitiveType.Utf8,
"email": ydb.PrimitiveType.Utf8
}
# Perform bulk upsert asynchronously
await session.bulk_upsert(
"/local/users",
rows,
column_types=column_types
)
# Usage
async with pool.checkout() as session:
await bulk_insert_users(session, user_data_list)async def setup_database_schema():
async with ydb_aio.Driver(...) as driver:
scheme_client = driver.scheme_client()
# Create directories
await scheme_client.make_directory("/local/app")
await scheme_client.make_directory("/local/app/tables")
# List directory contents
directory = await scheme_client.list_directory("/local/app")
for entry in directory.children:
print(f"Entry: {entry.name}, Type: {entry.type}")
# Create table through session
async with ydb_aio.SessionPool(driver) as pool:
async def create_table_operation(session):
table_description = (
ydb.TableDescription()
.with_column(ydb.TableColumn("id", ydb.OptionalType(ydb.PrimitiveType.Uint64)))
.with_column(ydb.TableColumn("name", ydb.OptionalType(ydb.PrimitiveType.Utf8)))
.with_primary_key("id")
)
await session.create_table("/local/app/tables/users", table_description)
await pool.retry_operation(create_table_operation)# Type aliases for async operations
AsyncQueryCallback = Callable[['Session'], Awaitable[Any]]
AsyncQuerySessionCallback = Callable[['QuerySession'], Awaitable[Any]]
AsyncResultIterator = AsyncIterator[ResultSet]
# Common async context managers
AsyncDriverContext = AsyncContextManager[Driver]
AsyncSessionPoolContext = AsyncContextManager[SessionPool]
AsyncSessionContext = AsyncContextManager[Session]
AsyncTxContextManager = AsyncContextManager[AsyncTxContext]