Officially supported Python client for YDB distributed SQL database
Comprehensive table operations including session management, transaction handling, query execution, table creation/modification, bulk operations, and point reads.
Session pool provides efficient session management with automatic retry, failover, and resource pooling.
class SessionPool:
def __init__(
self,
driver: Driver,
size: int = None,
creation_timeout: float = None,
**kwargs
):
"""
Create session pool for database operations.
Args:
driver (Driver): YDB driver instance
size (int, optional): Maximum pool size
creation_timeout (float, optional): Session creation timeout
"""
def retry_operation_sync(
self,
callee: callable,
retry_settings: RetrySettings = None,
*args,
**kwargs
):
"""
Execute operation with automatic retry and session management.
Args:
callee (callable): Function to execute with session as first argument
retry_settings (RetrySettings, optional): Custom retry configuration
*args: Additional arguments for callee
**kwargs: Additional keyword arguments for callee
"""
def acquire(
self,
timeout: float = None,
settings: SessionCheckoutSettings = None
) -> Session:
"""
Acquire session from pool.
Args:
timeout (float, optional): Acquisition timeout
settings (SessionCheckoutSettings, optional): Checkout settings
Returns:
Session: Database session
"""
def release(self, session: Session):
"""
Return session to pool.
Args:
session (Session): Session to return
"""
def stop(self, timeout: float = None):
"""
Stop session pool and close all sessions.
Args:
timeout (float, optional): Shutdown timeout
"""Session provides the interface for executing queries and managing transactions.
class Session:
def execute_query(
self,
query: str,
parameters: dict = None,
settings: ExecuteQuerySettings = None
):
"""
Execute YQL query in autocommit mode.
Args:
query (str): YQL query text
parameters (dict, optional): Query parameters
settings (ExecuteQuerySettings, optional): Execution settings
Returns:
List of result sets
"""
def transaction(
self,
tx_mode: TxMode = None,
settings: BeginTxSettings = None
) -> TxContext:
"""
Begin transaction context.
Args:
tx_mode (TxMode, optional): Transaction mode
settings (BeginTxSettings, optional): Transaction settings
Returns:
TxContext: Transaction context manager
"""
def prepare_query(
self,
query: str,
settings: PrepareQuerySettings = None
) -> DataQuery:
"""
Prepare query for multiple executions.
Args:
query (str): YQL query text
settings (PrepareQuerySettings, optional): Preparation settings
Returns:
DataQuery: Prepared query object
"""
def create_table(
self,
path: str,
table_description: TableDescription,
settings: CreateTableSettings = None
):
"""
Create new table.
Args:
path (str): Table path
table_description (TableDescription): Table schema
settings (CreateTableSettings, optional): Creation settings
"""
def alter_table(
self,
path: str,
alter_table_settings: AlterTableSettings
):
"""
Modify existing table schema.
Args:
path (str): Table path
alter_table_settings (AlterTableSettings): Modification settings
"""
def drop_table(
self,
path: str,
settings: DropTableSettings = None
):
"""
Delete table.
Args:
path (str): Table path
settings (DropTableSettings, optional): Deletion settings
"""
def describe_table(
self,
path: str,
settings: DescribeTableSettings = None
) -> TableDescription:
"""
Get table schema information.
Args:
path (str): Table path
settings (DescribeTableSettings, optional): Description settings
Returns:
TableDescription: Table schema details
"""
def bulk_upsert(
self,
path: str,
rows: List[dict],
settings: BulkUpsertSettings = None
):
"""
Bulk insert/update rows.
Args:
path (str): Table path
rows (List[dict]): Row data
settings (BulkUpsertSettings, optional): Upsert settings
"""
def read_table(
self,
path: str,
key_range: KeyRange = None,
columns: List[str] = None,
settings: ReadTableSettings = None
):
"""
Read table data with streaming.
Args:
path (str): Table path
key_range (KeyRange, optional): Key range filter
columns (List[str], optional): Columns to read
settings (ReadTableSettings, optional): Read settings
Yields:
Result sets with table data
"""Transaction context manager for executing multiple operations atomically.
class TxContext:
def execute(
self,
query: str,
parameters: dict = None,
commit_tx: bool = False,
settings: ExecuteQuerySettings = None
):
"""
Execute query within transaction.
Args:
query (str): YQL query text
parameters (dict, optional): Query parameters
commit_tx (bool): Auto-commit after execution
settings (ExecuteQuerySettings, optional): Execution settings
Returns:
List of result sets
"""
def commit(self, settings: CommitTxSettings = None):
"""
Commit transaction.
Args:
settings (CommitTxSettings, optional): Commit settings
"""
def rollback(self, settings: RollbackTxSettings = None):
"""
Rollback transaction.
Args:
settings (RollbackTxSettings, optional): Rollback settings
"""Prepared queries for efficient repeated execution.
class DataQuery:
def execute(
self,
parameters: dict = None,
settings: ExecuteQuerySettings = None
):
"""
Execute prepared query.
Args:
parameters (dict, optional): Query parameters
settings (ExecuteQuerySettings, optional): Execution settings
Returns:
List of result sets
"""
@property
def query_id(self) -> str:
"""Get query identifier for caching."""Classes for defining table structure and constraints.
class TableDescription:
def __init__(self):
"""Create empty table description."""
def with_column(self, column: TableColumn) -> 'TableDescription':
"""
Add column to table definition.
Args:
column (TableColumn): Column definition
Returns:
TableDescription: Self for chaining
"""
def with_primary_key(self, *key_names: str) -> 'TableDescription':
"""
Set primary key columns.
Args:
*key_names (str): Primary key column names
Returns:
TableDescription: Self for chaining
"""
def with_index(self, index: TableIndex) -> 'TableDescription':
"""
Add secondary index.
Args:
index (TableIndex): Index definition
Returns:
TableDescription: Self for chaining
"""
class TableColumn:
def __init__(self, name: str, type_: Type, family: str = None):
"""
Define table column.
Args:
name (str): Column name
type_ (Type): Column data type
family (str, optional): Column family name
"""
class TableIndex:
def __init__(
self,
name: str,
index_columns: List[str],
data_columns: List[str] = None
):
"""
Define secondary index.
Args:
name (str): Index name
index_columns (List[str]): Indexed columns
data_columns (List[str], optional): Additional data columns
"""Transaction isolation levels and modes.
class TxMode:
"""Transaction modes for different consistency levels."""
SERIALIZABLE_RW: TxMode # Serializable read-write
ONLINE_RO: TxMode # Online read-only
STALE_RO: TxMode # Stale read-only
SNAPSHOT_RO: TxMode # Snapshot read-only
class OnlineReadOnlyTxMode:
"""Online read-only transaction modes."""
def __init__(self, allow_inconsistent_reads: bool = False):
"""
Configure online read-only mode.
Args:
allow_inconsistent_reads (bool): Allow inconsistent reads
"""
class StalenessMode:
"""Staleness modes for stale read-only transactions."""
def __init__(self, max_staleness: int, unit: Unit = Unit.SECONDS):
"""
Configure staleness parameters.
Args:
max_staleness (int): Maximum staleness value
unit (Unit): Time unit for staleness
"""Define key ranges for point reads and scans.
class KeyRange:
def __init__(
self,
from_bound: KeyBound = None,
to_bound: KeyBound = None
):
"""
Define key range for table operations.
Args:
from_bound (KeyBound, optional): Lower bound
to_bound (KeyBound, optional): Upper bound
"""
@classmethod
def prefix(cls, prefix_tuple: tuple) -> 'KeyRange':
"""
Create range for key prefix.
Args:
prefix_tuple (tuple): Key prefix values
Returns:
KeyRange: Range matching prefix
"""
@classmethod
def point(cls, key_tuple: tuple) -> 'KeyRange':
"""
Create range for single key.
Args:
key_tuple (tuple): Exact key values
Returns:
KeyRange: Point range
"""
class KeyBound:
def __init__(self, key_tuple: tuple, is_inclusive: bool = True):
"""
Define key boundary.
Args:
key_tuple (tuple): Key values
is_inclusive (bool): Include boundary in range
"""import ydb
# Setup driver and session pool
driver = ydb.Driver(
endpoint="grpc://localhost:2136",
database="/local",
credentials=ydb.AnonymousCredentials()
)
driver.wait(fail_fast=True)
session_pool = ydb.SessionPool(driver)
def create_and_populate_table(session):
# Create table
session.create_table(
'/local/users',
ydb.TableDescription()
.with_column(ydb.TableColumn('id', ydb.OptionalType(ydb.PrimitiveType.Uint64)))
.with_column(ydb.TableColumn('name', ydb.OptionalType(ydb.PrimitiveType.Utf8)))
.with_column(ydb.TableColumn('email', ydb.OptionalType(ydb.PrimitiveType.Utf8)))
.with_primary_key('id')
)
# Insert data
session.transaction().execute(
"""
INSERT INTO users (id, name, email)
VALUES (1, "Alice", "alice@example.com"),
(2, "Bob", "bob@example.com");
""",
commit_tx=True
)
# Execute with retry
session_pool.retry_operation_sync(create_and_populate_table)def query_users(session):
# Parameterized query
result_sets = session.transaction().execute(
"""
DECLARE $min_id AS Uint64;
SELECT id, name, email
FROM users
WHERE id >= $min_id
ORDER BY id;
""",
parameters={'$min_id': 1},
commit_tx=True
)
for result_set in result_sets:
for row in result_set.rows:
print(f"User: {row.name} ({row.email})")
session_pool.retry_operation_sync(query_users)def bulk_insert_users(session):
# Bulk upsert for efficient large data inserts
users_data = [
{'id': 3, 'name': 'Charlie', 'email': 'charlie@example.com'},
{'id': 4, 'name': 'Diana', 'email': 'diana@example.com'},
{'id': 5, 'name': 'Eve', 'email': 'eve@example.com'},
]
session.bulk_upsert('/local/users', users_data)
session_pool.retry_operation_sync(bulk_insert_users)def transfer_operation(session):
# Multi-statement transaction
tx = session.transaction(ydb.SerializableReadWrite())
try:
# Check balance
result_sets = tx.execute(
"""
DECLARE $from_id AS Uint64;
SELECT balance FROM accounts WHERE id = $from_id;
""",
parameters={'$from_id': 1}
)
balance = result_sets[0].rows[0].balance
if balance < 100:
raise ValueError("Insufficient funds")
# Perform transfer
tx.execute(
"""
DECLARE $from_id AS Uint64;
DECLARE $to_id AS Uint64;
DECLARE $amount AS Uint64;
UPDATE accounts SET balance = balance - $amount WHERE id = $from_id;
UPDATE accounts SET balance = balance + $amount WHERE id = $to_id;
""",
parameters={'$from_id': 1, '$to_id': 2, '$amount': 100}
)
# Commit transaction
tx.commit()
except Exception:
# Rollback on error
tx.rollback()
raise
session_pool.retry_operation_sync(transfer_operation)def use_prepared_query(session):
# Prepare query once
prepared_query = session.prepare_query(
"""
DECLARE $user_id AS Uint64;
SELECT name, email FROM users WHERE id = $user_id;
"""
)
# Execute multiple times with different parameters
for user_id in [1, 2, 3]:
result_sets = prepared_query.execute(
parameters={'$user_id': user_id}
)
for result_set in result_sets:
for row in result_set.rows:
print(f"User {user_id}: {row.name}")
session_pool.retry_operation_sync(use_prepared_query)def stream_table_data(session):
# Stream large table efficiently
for result_set in session.read_table('/local/large_table'):
for row in result_set.rows:
# Process row
process_row(row)
def process_row(row):
# Process individual row
print(f"Processing: {row}")
session_pool.retry_operation_sync(stream_table_data)from typing import List, Dict, Any, Optional, Iterator
# Settings classes for operations
class SessionCheckoutSettings:
def __init__(self, timeout: float = None): ...
class ExecuteQuerySettings:
def __init__(self, **kwargs): ...
class PrepareQuerySettings:
def __init__(self, **kwargs): ...
class CreateTableSettings:
def __init__(self, **kwargs): ...
class AlterTableSettings:
def __init__(self, **kwargs): ...
class DropTableSettings:
def __init__(self, **kwargs): ...
class DescribeTableSettings:
def __init__(self, **kwargs): ...
class BulkUpsertSettings:
def __init__(self, **kwargs): ...
class ReadTableSettings:
def __init__(self, **kwargs): ...
class BeginTxSettings:
def __init__(self, **kwargs): ...
class CommitTxSettings:
def __init__(self, **kwargs): ...
class RollbackTxSettings:
def __init__(self, **kwargs): ...
# Type aliases
QueryText = str
Parameters = Dict[str, Any]
TablePath = str
ColumnName = str