CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ydb

Officially supported Python client for YDB distributed SQL database

Overview
Eval results
Files

table-operations.mddocs/

Table Operations and Sessions

Comprehensive table operations including session management, transaction handling, query execution, table creation/modification, bulk operations, and point reads.

Capabilities

Session Pool Management

Session pool provides efficient session management with automatic retry, failover, and resource pooling.

class SessionPool:
    def __init__(
        self,
        driver: Driver,
        size: int = None,
        creation_timeout: float = None,
        **kwargs
    ):
        """
        Create session pool for database operations.
        
        Args:
            driver (Driver): YDB driver instance
            size (int, optional): Maximum pool size
            creation_timeout (float, optional): Session creation timeout
        """

    def retry_operation_sync(
        self,
        callee: callable,
        retry_settings: RetrySettings = None,
        *args,
        **kwargs
    ):
        """
        Execute operation with automatic retry and session management.
        
        Args:
            callee (callable): Function to execute with session as first argument
            retry_settings (RetrySettings, optional): Custom retry configuration
            *args: Additional arguments for callee
            **kwargs: Additional keyword arguments for callee
        """

    def acquire(
        self,
        timeout: float = None,
        settings: SessionCheckoutSettings = None
    ) -> Session:
        """
        Acquire session from pool.
        
        Args:
            timeout (float, optional): Acquisition timeout
            settings (SessionCheckoutSettings, optional): Checkout settings
            
        Returns:
            Session: Database session
        """

    def release(self, session: Session):
        """
        Return session to pool.
        
        Args:
            session (Session): Session to return
        """

    def stop(self, timeout: float = None):
        """
        Stop session pool and close all sessions.
        
        Args:
            timeout (float, optional): Shutdown timeout
        """

Database Session Operations

Session provides the interface for executing queries and managing transactions.

class Session:
    def execute_query(
        self,
        query: str,
        parameters: dict = None,
        settings: ExecuteQuerySettings = None
    ):
        """
        Execute YQL query in autocommit mode.
        
        Args:
            query (str): YQL query text
            parameters (dict, optional): Query parameters
            settings (ExecuteQuerySettings, optional): Execution settings
            
        Returns:
            List of result sets
        """

    def transaction(
        self,
        tx_mode: TxMode = None,
        settings: BeginTxSettings = None
    ) -> TxContext:
        """
        Begin transaction context.
        
        Args:
            tx_mode (TxMode, optional): Transaction mode
            settings (BeginTxSettings, optional): Transaction settings
            
        Returns:
            TxContext: Transaction context manager
        """

    def prepare_query(
        self,
        query: str,
        settings: PrepareQuerySettings = None
    ) -> DataQuery:
        """
        Prepare query for multiple executions.
        
        Args:
            query (str): YQL query text
            settings (PrepareQuerySettings, optional): Preparation settings
            
        Returns:
            DataQuery: Prepared query object
        """

    def create_table(
        self,
        path: str,
        table_description: TableDescription,
        settings: CreateTableSettings = None
    ):
        """
        Create new table.
        
        Args:
            path (str): Table path
            table_description (TableDescription): Table schema
            settings (CreateTableSettings, optional): Creation settings
        """

    def alter_table(
        self,
        path: str,
        alter_table_settings: AlterTableSettings
    ):
        """
        Modify existing table schema.
        
        Args:
            path (str): Table path
            alter_table_settings (AlterTableSettings): Modification settings
        """

    def drop_table(
        self,
        path: str,
        settings: DropTableSettings = None
    ):
        """
        Delete table.
        
        Args:
            path (str): Table path
            settings (DropTableSettings, optional): Deletion settings
        """

    def describe_table(
        self,
        path: str,
        settings: DescribeTableSettings = None
    ) -> TableDescription:
        """
        Get table schema information.
        
        Args:
            path (str): Table path
            settings (DescribeTableSettings, optional): Description settings
            
        Returns:
            TableDescription: Table schema details
        """

    def bulk_upsert(
        self,
        path: str,
        rows: List[dict],
        settings: BulkUpsertSettings = None
    ):
        """
        Bulk insert/update rows.
        
        Args:
            path (str): Table path
            rows (List[dict]): Row data
            settings (BulkUpsertSettings, optional): Upsert settings
        """

    def read_table(
        self,
        path: str,
        key_range: KeyRange = None,
        columns: List[str] = None,
        settings: ReadTableSettings = None
    ):
        """
        Read table data with streaming.
        
        Args:
            path (str): Table path
            key_range (KeyRange, optional): Key range filter
            columns (List[str], optional): Columns to read
            settings (ReadTableSettings, optional): Read settings
            
        Yields:
            Result sets with table data
        """

Transaction Context

Transaction context manager for executing multiple operations atomically.

class TxContext:
    def execute(
        self,
        query: str,
        parameters: dict = None,
        commit_tx: bool = False,
        settings: ExecuteQuerySettings = None
    ):
        """
        Execute query within transaction.
        
        Args:
            query (str): YQL query text
            parameters (dict, optional): Query parameters
            commit_tx (bool): Auto-commit after execution
            settings (ExecuteQuerySettings, optional): Execution settings
            
        Returns:
            List of result sets
        """

    def commit(self, settings: CommitTxSettings = None):
        """
        Commit transaction.
        
        Args:
            settings (CommitTxSettings, optional): Commit settings
        """

    def rollback(self, settings: RollbackTxSettings = None):
        """
        Rollback transaction.
        
        Args:
            settings (RollbackTxSettings, optional): Rollback settings
        """

Prepared Queries

Prepared queries for efficient repeated execution.

class DataQuery:
    def execute(
        self,
        parameters: dict = None,
        settings: ExecuteQuerySettings = None
    ):
        """
        Execute prepared query.
        
        Args:
            parameters (dict, optional): Query parameters
            settings (ExecuteQuerySettings, optional): Execution settings
            
        Returns:
            List of result sets
        """

    @property
    def query_id(self) -> str:
        """Get query identifier for caching."""

Table Schema Definitions

Classes for defining table structure and constraints.

class TableDescription:
    def __init__(self):
        """Create empty table description."""

    def with_column(self, column: TableColumn) -> 'TableDescription':
        """
        Add column to table definition.
        
        Args:
            column (TableColumn): Column definition
            
        Returns:
            TableDescription: Self for chaining
        """

    def with_primary_key(self, *key_names: str) -> 'TableDescription':
        """
        Set primary key columns.
        
        Args:
            *key_names (str): Primary key column names
            
        Returns:
            TableDescription: Self for chaining
        """

    def with_index(self, index: TableIndex) -> 'TableDescription':
        """
        Add secondary index.
        
        Args:
            index (TableIndex): Index definition
            
        Returns:
            TableDescription: Self for chaining
        """

class TableColumn:
    def __init__(self, name: str, type_: Type, family: str = None):
        """
        Define table column.
        
        Args:
            name (str): Column name
            type_ (Type): Column data type
            family (str, optional): Column family name
        """

class TableIndex:
    def __init__(
        self,
        name: str,
        index_columns: List[str],
        data_columns: List[str] = None
    ):
        """
        Define secondary index.
        
        Args:
            name (str): Index name
            index_columns (List[str]): Indexed columns
            data_columns (List[str], optional): Additional data columns
        """

Transaction Modes

Transaction isolation levels and modes.

class TxMode:
    """Transaction modes for different consistency levels."""
    
    SERIALIZABLE_RW: TxMode      # Serializable read-write
    ONLINE_RO: TxMode           # Online read-only
    STALE_RO: TxMode            # Stale read-only  
    SNAPSHOT_RO: TxMode         # Snapshot read-only

class OnlineReadOnlyTxMode:
    """Online read-only transaction modes."""
    
    def __init__(self, allow_inconsistent_reads: bool = False):
        """
        Configure online read-only mode.
        
        Args:
            allow_inconsistent_reads (bool): Allow inconsistent reads
        """

class StalenessMode:
    """Staleness modes for stale read-only transactions."""
    
    def __init__(self, max_staleness: int, unit: Unit = Unit.SECONDS):
        """
        Configure staleness parameters.
        
        Args:
            max_staleness (int): Maximum staleness value
            unit (Unit): Time unit for staleness
        """

Key Range Operations

Define key ranges for point reads and scans.

class KeyRange:
    def __init__(
        self,
        from_bound: KeyBound = None,
        to_bound: KeyBound = None
    ):
        """
        Define key range for table operations.
        
        Args:
            from_bound (KeyBound, optional): Lower bound
            to_bound (KeyBound, optional): Upper bound
        """

    @classmethod
    def prefix(cls, prefix_tuple: tuple) -> 'KeyRange':
        """
        Create range for key prefix.
        
        Args:
            prefix_tuple (tuple): Key prefix values
            
        Returns:
            KeyRange: Range matching prefix
        """

    @classmethod
    def point(cls, key_tuple: tuple) -> 'KeyRange':
        """
        Create range for single key.
        
        Args:
            key_tuple (tuple): Exact key values
            
        Returns:
            KeyRange: Point range
        """

class KeyBound:
    def __init__(self, key_tuple: tuple, is_inclusive: bool = True):
        """
        Define key boundary.
        
        Args:
            key_tuple (tuple): Key values
            is_inclusive (bool): Include boundary in range
        """

Usage Examples

Basic Table Operations

import ydb

# Setup driver and session pool
driver = ydb.Driver(
    endpoint="grpc://localhost:2136",
    database="/local",
    credentials=ydb.AnonymousCredentials()
)
driver.wait(fail_fast=True)

session_pool = ydb.SessionPool(driver)

def create_and_populate_table(session):
    # Create table
    session.create_table(
        '/local/users',
        ydb.TableDescription()
        .with_column(ydb.TableColumn('id', ydb.OptionalType(ydb.PrimitiveType.Uint64)))
        .with_column(ydb.TableColumn('name', ydb.OptionalType(ydb.PrimitiveType.Utf8)))
        .with_column(ydb.TableColumn('email', ydb.OptionalType(ydb.PrimitiveType.Utf8)))
        .with_primary_key('id')
    )
    
    # Insert data
    session.transaction().execute(
        """
        INSERT INTO users (id, name, email)
        VALUES (1, "Alice", "alice@example.com"),
               (2, "Bob", "bob@example.com");
        """,
        commit_tx=True
    )

# Execute with retry
session_pool.retry_operation_sync(create_and_populate_table)

Query Execution with Parameters

def query_users(session):
    # Parameterized query
    result_sets = session.transaction().execute(
        """
        DECLARE $min_id AS Uint64;
        SELECT id, name, email 
        FROM users 
        WHERE id >= $min_id
        ORDER BY id;
        """,
        parameters={'$min_id': 1},
        commit_tx=True
    )
    
    for result_set in result_sets:
        for row in result_set.rows:
            print(f"User: {row.name} ({row.email})")

session_pool.retry_operation_sync(query_users)

Bulk Operations

def bulk_insert_users(session):
    # Bulk upsert for efficient large data inserts
    users_data = [
        {'id': 3, 'name': 'Charlie', 'email': 'charlie@example.com'},
        {'id': 4, 'name': 'Diana', 'email': 'diana@example.com'},
        {'id': 5, 'name': 'Eve', 'email': 'eve@example.com'},
    ]
    
    session.bulk_upsert('/local/users', users_data)

session_pool.retry_operation_sync(bulk_insert_users)

Transaction Management

def transfer_operation(session):
    # Multi-statement transaction
    tx = session.transaction(ydb.SerializableReadWrite())
    
    try:
        # Check balance
        result_sets = tx.execute(
            """
            DECLARE $from_id AS Uint64;
            SELECT balance FROM accounts WHERE id = $from_id;
            """,
            parameters={'$from_id': 1}
        )
        
        balance = result_sets[0].rows[0].balance
        if balance < 100:
            raise ValueError("Insufficient funds")
        
        # Perform transfer
        tx.execute(
            """
            DECLARE $from_id AS Uint64;
            DECLARE $to_id AS Uint64;  
            DECLARE $amount AS Uint64;
            
            UPDATE accounts SET balance = balance - $amount WHERE id = $from_id;
            UPDATE accounts SET balance = balance + $amount WHERE id = $to_id;
            """,
            parameters={'$from_id': 1, '$to_id': 2, '$amount': 100}
        )
        
        # Commit transaction
        tx.commit()
        
    except Exception:
        # Rollback on error
        tx.rollback()
        raise

session_pool.retry_operation_sync(transfer_operation)

Prepared Queries

def use_prepared_query(session):
    # Prepare query once
    prepared_query = session.prepare_query(
        """
        DECLARE $user_id AS Uint64;
        SELECT name, email FROM users WHERE id = $user_id;
        """
    )
    
    # Execute multiple times with different parameters
    for user_id in [1, 2, 3]:
        result_sets = prepared_query.execute(
            parameters={'$user_id': user_id}
        )
        
        for result_set in result_sets:
            for row in result_set.rows:
                print(f"User {user_id}: {row.name}")

session_pool.retry_operation_sync(use_prepared_query)

Table Streaming Reads

def stream_table_data(session):
    # Stream large table efficiently
    for result_set in session.read_table('/local/large_table'):
        for row in result_set.rows:
            # Process row
            process_row(row)

def process_row(row):
    # Process individual row
    print(f"Processing: {row}")

session_pool.retry_operation_sync(stream_table_data)

Types

from typing import List, Dict, Any, Optional, Iterator

# Settings classes for operations
class SessionCheckoutSettings:
    def __init__(self, timeout: float = None): ...

class ExecuteQuerySettings:
    def __init__(self, **kwargs): ...

class PrepareQuerySettings:
    def __init__(self, **kwargs): ...

class CreateTableSettings:
    def __init__(self, **kwargs): ...

class AlterTableSettings:
    def __init__(self, **kwargs): ...

class DropTableSettings:
    def __init__(self, **kwargs): ...

class DescribeTableSettings:
    def __init__(self, **kwargs): ...

class BulkUpsertSettings:
    def __init__(self, **kwargs): ...

class ReadTableSettings:
    def __init__(self, **kwargs): ...

class BeginTxSettings:
    def __init__(self, **kwargs): ...

class CommitTxSettings:
    def __init__(self, **kwargs): ...

class RollbackTxSettings:
    def __init__(self, **kwargs): ...

# Type aliases
QueryText = str
Parameters = Dict[str, Any]
TablePath = str
ColumnName = str

Install with Tessl CLI

npx tessl i tessl/pypi-ydb

docs

async-operations.md

authentication.md

data-types.md

dbapi-interface.md

driver-connection.md

error-handling.md

index.md

query-service.md

schema-operations.md

sqlalchemy-integration.md

table-operations.md

topic-operations.md

tile.json