CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ydb

Officially supported Python client for YDB distributed SQL database

Overview
Eval results
Files

sqlalchemy-integration.mddocs/

SQLAlchemy Integration

SQLAlchemy dialect for YDB enabling ORM usage, custom YDB types, connection management, and standard SQLAlchemy operations.

Capabilities

YDB SQLAlchemy Dialect

The YDB SQLAlchemy dialect enables using YDB with SQLAlchemy ORM and core functionality.

import ydb.sqlalchemy

class YqlDialect(DefaultDialect):
    """
    SQLAlchemy dialect for YDB (YQL).
    
    Provides SQLAlchemy integration for YDB database operations
    including ORM support, query compilation, and type mapping.
    """
    
    name = "yql"
    supports_alter = False
    max_identifier_length = 63
    supports_sane_rowcount = False
    supports_statement_cache = False
    
    supports_native_enum = False
    supports_native_boolean = True
    supports_smallserial = False
    
    supports_sequences = False
    sequences_optional = True
    preexecute_autoincrement_sequences = True
    postfetch_lastrowid = False
    
    supports_default_values = False
    supports_empty_insert = False
    supports_multivalues_insert = True
    default_paramstyle = "qmark"
    
    isolation_level = None
    
    @staticmethod
    def dbapi():
        """
        Get YDB DB-API module.
        
        Returns:
            module: YDB DB-API 2.0 module
        """
        import ydb.dbapi
        return ydb.dbapi

    def get_columns(
        self,
        connection,
        table_name: str,
        schema: str = None,
        **kwargs
    ) -> List[Dict]:
        """
        Get table column information for SQLAlchemy reflection.
        
        Args:
            connection: SQLAlchemy connection
            table_name (str): Table name
            schema (str, optional): Schema name (not supported)
            **kwargs: Additional arguments
            
        Returns:
            List[Dict]: Column information dictionaries
        """

    def has_table(
        self,
        connection,
        table_name: str,
        schema: str = None
    ) -> bool:
        """
        Check if table exists.
        
        Args:
            connection: SQLAlchemy connection
            table_name (str): Table name to check
            schema (str, optional): Schema name (not supported)
            
        Returns:
            bool: True if table exists
        """

def register_dialect(
    name: str = "yql",
    module: str = None,
    cls: str = "YqlDialect"
):
    """
    Register YDB SQLAlchemy dialect.
    
    Args:
        name (str): Dialect name
        module (str, optional): Module path
        cls (str): Dialect class name
    """

YDB-Specific SQLAlchemy Types

Custom SQLAlchemy types for YDB-specific data types.

from sqlalchemy.types import Integer, TypeDecorator

class UInt8(Integer):
    """
    SQLAlchemy type for YDB UInt8.
    
    Maps to YDB's 8-bit unsigned integer type.
    """
    
    def __init__(self):
        """Create UInt8 type."""
        super().__init__()

class UInt32(Integer):
    """
    SQLAlchemy type for YDB UInt32.
    
    Maps to YDB's 32-bit unsigned integer type.
    """
    
    def __init__(self):
        """Create UInt32 type."""
        super().__init__()

class UInt64(Integer):
    """
    SQLAlchemy type for YDB UInt64.
    
    Maps to YDB's 64-bit unsigned integer type.
    """
    
    def __init__(self):
        """Create UInt64 type."""
        super().__init__()

class YdbDateTime(TypeDecorator):
    """
    SQLAlchemy type decorator for YDB datetime handling.
    
    Handles conversion between Python datetime and YDB datetime types.
    """
    
    impl = sa.DATETIME
    cache_ok = True

class YdbDecimal(TypeDecorator):
    """
    SQLAlchemy type decorator for YDB decimal type.
    
    Handles YDB decimal precision and scale requirements.
    """
    
    impl = sa.DECIMAL
    cache_ok = True
    
    def __init__(self, precision: int = 22, scale: int = 9):
        """
        Create YDB decimal type.
        
        Args:
            precision (int): Decimal precision (max 35)
            scale (int): Decimal scale (max precision)
        """
        super().__init__(precision=precision, scale=scale)

class YdbJson(TypeDecorator):
    """
    SQLAlchemy type decorator for YDB JSON types.
    
    Handles both Json and JsonDocument YDB types.
    """
    
    impl = sa.JSON
    cache_ok = True

Type Compiler

Custom type compiler for translating SQLAlchemy types to YQL types.

class YqlTypeCompiler(GenericTypeCompiler):
    """
    Type compiler for converting SQLAlchemy types to YQL.
    """
    
    def visit_VARCHAR(self, type_, **kwargs) -> str:
        """
        Convert VARCHAR to YQL STRING type.
        
        Args:
            type_: SQLAlchemy VARCHAR type
            **kwargs: Additional arguments
            
        Returns:
            str: YQL type string
        """
        return "STRING"

    def visit_unicode(self, type_, **kwargs) -> str:
        """
        Convert Unicode to YQL UTF8 type.
        
        Args:
            type_: SQLAlchemy Unicode type
            **kwargs: Additional arguments
            
        Returns:
            str: YQL type string
        """
        return "UTF8"

    def visit_TEXT(self, type_, **kwargs) -> str:
        """
        Convert TEXT to YQL UTF8 type.
        
        Args:
            type_: SQLAlchemy TEXT type
            **kwargs: Additional arguments
            
        Returns:
            str: YQL type string
        """
        return "UTF8"

    def visit_BOOLEAN(self, type_, **kwargs) -> str:
        """
        Convert BOOLEAN to YQL BOOL type.
        
        Args:
            type_: SQLAlchemy BOOLEAN type
            **kwargs: Additional arguments
            
        Returns:
            str: YQL type string
        """
        return "BOOL"

    def visit_FLOAT(self, type_, **kwargs) -> str:
        """
        Convert FLOAT to YQL DOUBLE type.
        
        Args:
            type_: SQLAlchemy FLOAT type
            **kwargs: Additional arguments
            
        Returns:
            str: YQL type string
        """
        return "DOUBLE"

    def visit_uint32(self, type_, **kwargs) -> str:
        """
        Convert UInt32 to YQL UInt32 type.
        
        Args:
            type_: YDB UInt32 type
            **kwargs: Additional arguments
            
        Returns:
            str: YQL type string
        """
        return "UInt32"

    def visit_uint64(self, type_, **kwargs) -> str:
        """
        Convert UInt64 to YQL UInt64 type.
        
        Args:
            type_: YDB UInt64 type
            **kwargs: Additional arguments
            
        Returns:
            str: YQL type string
        """
        return "UInt64"

    def visit_uint8(self, type_, **kwargs) -> str:
        """
        Convert UInt8 to YQL UInt8 type.
        
        Args:
            type_: YDB UInt8 type
            **kwargs: Additional arguments
            
        Returns:
            str: YQL type string
        """
        return "UInt8"

Query Compiler

Custom SQL compiler for generating YQL from SQLAlchemy expressions.

class YqlCompiler(SQLCompiler):
    """
    SQL compiler for generating YQL queries from SQLAlchemy expressions.
    """
    
    def group_by_clause(self, select, **kwargs):
        """
        Generate GROUP BY clause for YQL.
        
        Args:
            select: SQLAlchemy select object
            **kwargs: Additional arguments
            
        Returns:
            str: YQL GROUP BY clause
        """
        kwargs.update(within_columns_clause=True)
        return super().group_by_clause(select, **kwargs)

    def visit_function(self, func, add_to_result_map=None, **kwargs):
        """
        Visit function expressions and convert to YQL syntax.
        
        Args:
            func: SQLAlchemy function expression
            add_to_result_map: Result map callback
            **kwargs: Additional arguments
            
        Returns:
            str: YQL function call
        """
        # Handle YQL namespace syntax (::) instead of SQL (.)
        disp = getattr(self, f"visit_{func.name.lower()}_func", None)
        if disp:
            return disp(func, **kwargs)
        
        # Convert function names to YQL format
        name = func.name
        if hasattr(func, 'packagenames') and func.packagenames:
            name = "::".join(func.packagenames + [name])
        
        return f"{name}{self.function_argspec(func, **kwargs)}"

    def visit_lambda(self, lambda_, **kwargs):
        """
        Visit lambda expressions for YQL.
        
        Args:
            lambda_: Lambda expression
            **kwargs: Additional arguments
            
        Returns:
            str: YQL lambda expression
        """
        func = lambda_.func
        spec = inspect_getfullargspec(func)
        
        # Build YQL lambda syntax: ($arg1, $arg2) -> { RETURN expression; }
        args_str = "(" + ", ".join(f"${arg}" for arg in spec.args) + ")"
        
        # Create literal columns for lambda parameters
        args = [literal_column(f"${arg}") for arg in spec.args]
        body_expr = func(*args)
        body_str = self.process(body_expr, **kwargs)
        
        return f"{args_str} -> {{ RETURN {body_str}; }}"

class ParametrizedFunction(functions.Function):
    """
    SQLAlchemy function with YQL-style parameters.
    
    Supports YQL functions that take type parameters.
    """
    
    __visit_name__ = "parametrized_function"
    
    def __init__(self, name: str, params: List, *args, **kwargs):
        """
        Create parametrized function.
        
        Args:
            name (str): Function name
            params (List): Type parameters
            *args: Function arguments
            **kwargs: Additional arguments
        """
        super().__init__(name, *args, **kwargs)
        self._func_name = name
        self._func_params = params
        self.params_expr = ClauseList(
            operator=functions.operators.comma_op,
            group_contents=True,
            *params
        ).self_group()

Identifier Preparer

Custom identifier handling for YQL naming conventions.

class YqlIdentifierPreparer(IdentifierPreparer):
    """
    Identifier preparer for YQL naming conventions.
    
    Handles quoting and escaping of identifiers in YQL.
    """
    
    def __init__(self, dialect):
        """
        Create YQL identifier preparer.
        
        Args:
            dialect: YQL dialect instance
        """
        super().__init__(
            dialect,
            initial_quote="`",
            final_quote="`",
        )

    def _requires_quotes(self, value: str) -> bool:
        """
        Determine if identifier requires quoting.
        
        Args:
            value (str): Identifier value
            
        Returns:
            bool: True if quoting is required
        """
        # Force quoting unless already quoted
        return not (
            value.startswith(self.initial_quote) and 
            value.endswith(self.final_quote)
        )

    def quote_identifier(self, value: str) -> str:
        """
        Quote identifier for YQL.
        
        Args:
            value (str): Identifier to quote
            
        Returns:
            str: Quoted identifier
        """
        if self._requires_quotes(value):
            return f"{self.initial_quote}{value}{self.final_quote}"
        return value

Connection and Engine Creation

Utilities for creating SQLAlchemy engines and connections to YDB.

def create_ydb_engine(
    endpoint: str,
    database: str,
    credentials: ydb.Credentials = None,
    **engine_kwargs
) -> sa.Engine:
    """
    Create SQLAlchemy engine for YDB.
    
    Args:
        endpoint (str): YDB endpoint URL
        database (str): Database path
        credentials (ydb.Credentials, optional): Authentication credentials
        **engine_kwargs: Additional engine arguments
        
    Returns:
        sa.Engine: SQLAlchemy engine configured for YDB
    """
    # Register YDB dialect
    register_dialect()
    
    # Build connection string
    connection_string = f"yql://{endpoint.replace('grpc://', '')}{database}"
    
    # Create engine with YDB-specific settings
    engine = sa.create_engine(
        connection_string,
        credentials=credentials,
        **engine_kwargs
    )
    
    return engine

def get_ydb_metadata(engine: sa.Engine) -> sa.MetaData:
    """
    Get SQLAlchemy metadata with YDB table reflection.
    
    Args:
        engine (sa.Engine): YDB SQLAlchemy engine
        
    Returns:
        sa.MetaData: Metadata with reflected tables
    """
    metadata = sa.MetaData()
    metadata.reflect(bind=engine)
    return metadata

class YdbEngineBuilder:
    """
    Builder for YDB SQLAlchemy engines with configuration.
    """
    
    def __init__(self):
        """Create engine builder."""
        self.endpoint = None
        self.database = None
        self.credentials = None
        self.engine_options = {}
    
    def with_endpoint(self, endpoint: str) -> 'YdbEngineBuilder':
        """
        Set YDB endpoint.
        
        Args:
            endpoint (str): YDB endpoint URL
            
        Returns:
            YdbEngineBuilder: Self for chaining
        """
        self.endpoint = endpoint
        return self
    
    def with_database(self, database: str) -> 'YdbEngineBuilder':
        """
        Set database path.
        
        Args:
            database (str): Database path
            
        Returns:
            YdbEngineBuilder: Self for chaining
        """
        self.database = database
        return self
    
    def with_credentials(self, credentials: ydb.Credentials) -> 'YdbEngineBuilder':
        """
        Set authentication credentials.
        
        Args:
            credentials (ydb.Credentials): YDB credentials
            
        Returns:
            YdbEngineBuilder: Self for chaining
        """
        self.credentials = credentials
        return self
    
    def with_pool_size(self, size: int) -> 'YdbEngineBuilder':
        """
        Set connection pool size.
        
        Args:
            size (int): Pool size
            
        Returns:
            YdbEngineBuilder: Self for chaining
        """
        self.engine_options['pool_size'] = size
        return self
    
    def build(self) -> sa.Engine:
        """
        Build SQLAlchemy engine.
        
        Returns:
            sa.Engine: Configured YDB engine
        """
        if not self.endpoint or not self.database:
            raise ValueError("Endpoint and database are required")
        
        return create_ydb_engine(
            self.endpoint,
            self.database,
            self.credentials,
            **self.engine_options
        )

Usage Examples

Basic SQLAlchemy Setup

import sqlalchemy as sa
import ydb.sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# Register YDB dialect
ydb.sqlalchemy.register_dialect()

# Create engine
engine = sa.create_engine("yql://localhost:2136/local")

# Create metadata and base class
Base = declarative_base()
metadata = sa.MetaData()

# Define ORM model
class User(Base):
    __tablename__ = 'users'
    
    id = sa.Column(ydb.sqlalchemy.UInt64, primary_key=True)
    name = sa.Column(sa.Unicode(255), nullable=False)
    email = sa.Column(sa.Unicode(255), unique=True)
    age = sa.Column(ydb.sqlalchemy.UInt32)
    created_at = sa.Column(sa.DateTime, default=sa.func.CurrentUtcDatetime())
    is_active = sa.Column(sa.Boolean, default=True)
    metadata_json = sa.Column(sa.JSON)
    
    def __repr__(self):
        return f"<User(id={self.id}, name='{self.name}', email='{self.email}')>"

# Create session factory
Session = sessionmaker(bind=engine)

ORM Operations

def demonstrate_orm_operations():
    """Demonstrate basic ORM operations with YDB."""
    
    session = Session()
    
    try:
        # Create new user
        new_user = User(
            id=1001,
            name="Alice Johnson",
            email="alice@example.com",
            age=25,
            metadata_json={"department": "engineering", "level": "senior"}
        )
        
        session.add(new_user)
        session.commit()
        
        # Query users
        users = session.query(User).filter(User.age > 21).all()
        print(f"Found {len(users)} users over 21")
        
        # Update user
        user = session.query(User).filter(User.email == "alice@example.com").first()
        if user:
            user.age = 26
            user.metadata_json = {"department": "engineering", "level": "lead"}
            session.commit()
            print(f"Updated user: {user}")
        
        # Aggregate query
        avg_age = session.query(sa.func.avg(User.age)).scalar()
        print(f"Average age: {avg_age}")
        
        # Complex query with joins and filters
        active_engineers = session.query(User).filter(
            User.is_active == True,
            User.metadata_json['department'].astext == 'engineering'
        ).order_by(User.created_at.desc()).all()
        
        print(f"Active engineers: {len(active_engineers)}")
        
    except Exception as e:
        session.rollback()
        print(f"Error in ORM operations: {e}")
        
    finally:
        session.close()

demonstrate_orm_operations()

Advanced YQL Features

def demonstrate_yql_features():
    """Demonstrate YQL-specific features through SQLAlchemy."""
    
    session = Session()
    
    try:
        # Use YQL functions through SQLAlchemy
        query = session.query(
            User.name,
            sa.func.ListLength(
                sa.func.String_SplitToList(User.name, sa.literal(" "))
            ).label("name_parts_count")
        ).filter(User.is_active == True)
        
        for user_name, parts_count in query:
            print(f"User: {user_name}, Name parts: {parts_count}")
        
        # YQL lambda expressions (requires custom compilation)
        # This would need additional dialect customization
        
        # Window functions
        ranked_users = session.query(
            User.name,
            User.age,
            sa.func.row_number().over(
                order_by=User.age.desc()
            ).label("age_rank")
        ).all()
        
        for name, age, rank in ranked_users:
            print(f"Rank {rank}: {name} (age {age})")
        
        # JSON operations
        engineering_users = session.query(User).filter(
            sa.func.JsonValue(
                User.metadata_json,
                'strict $.department'
            ) == 'engineering'
        ).all()
        
        print(f"Engineering users: {len(engineering_users)}")
        
    except Exception as e:
        session.rollback()
        print(f"Error in YQL operations: {e}")
        
    finally:
        session.close()

demonstrate_yql_features()

Table Reflection and Inspection

def inspect_ydb_schema():
    """Inspect YDB schema through SQLAlchemy reflection."""
    
    # Reflect existing tables
    metadata = sa.MetaData()
    metadata.reflect(bind=engine)
    
    print("Reflected tables:")
    for table_name, table in metadata.tables.items():
        print(f"\nTable: {table_name}")
        print("Columns:")
        
        for column in table.columns:
            nullable = "NULL" if column.nullable else "NOT NULL"
            primary = "PRIMARY KEY" if column.primary_key else ""
            print(f"  {column.name}: {column.type} {nullable} {primary}")
        
        # Print indexes if any
        if table.indexes:
            print("Indexes:")
            for index in table.indexes:
                unique = "UNIQUE" if index.unique else ""
                columns = ", ".join([col.name for col in index.columns])
                print(f"  {index.name}: {unique} ({columns})")
    
    # Get table info using engine
    inspector = sa.inspect(engine)
    
    print("\nInspected table names:")
    table_names = inspector.get_table_names()
    for name in table_names:
        print(f"  {name}")
        
        # Get column info
        columns = inspector.get_columns(name)
        for col in columns:
            print(f"    {col['name']}: {col['type']} (nullable: {col['nullable']})")

inspect_ydb_schema()

Custom YDB Types Usage

from decimal import Decimal
from datetime import datetime

def demonstrate_custom_types():
    """Demonstrate YDB-specific SQLAlchemy types."""
    
    # Define table with YDB-specific types
    class YdbTypeDemo(Base):
        __tablename__ = 'ydb_type_demo'
        
        id = sa.Column(ydb.sqlalchemy.UInt64, primary_key=True)
        small_int = sa.Column(ydb.sqlalchemy.UInt8)
        medium_int = sa.Column(ydb.sqlalchemy.UInt32) 
        large_int = sa.Column(ydb.sqlalchemy.UInt64)
        precise_decimal = sa.Column(ydb.sqlalchemy.YdbDecimal(precision=22, scale=9))
        timestamp_field = sa.Column(ydb.sqlalchemy.YdbDateTime)
        json_data = sa.Column(ydb.sqlalchemy.YdbJson)
    
    # Create table (would need to be done outside SQLAlchemy for YDB)
    # Base.metadata.create_all(engine)  # Not supported in YDB
    
    session = Session()
    
    try:
        # Insert with YDB types
        demo_record = YdbTypeDemo(
            id=1,
            small_int=255,  # Max UInt8
            medium_int=4294967295,  # Max UInt32
            large_int=18446744073709551615,  # Max UInt64
            precise_decimal=Decimal('123456789.123456789'),
            timestamp_field=datetime.now(),
            json_data={
                "nested": {"key": "value"},
                "array": [1, 2, 3],
                "boolean": True
            }
        )
        
        session.add(demo_record)
        session.commit()
        
        # Query with type-specific operations
        results = session.query(YdbTypeDemo).filter(
            YdbTypeDemo.small_int > 100,
            YdbTypeDemo.precise_decimal > Decimal('100')
        ).all()
        
        for record in results:
            print(f"ID: {record.id}")
            print(f"Small int: {record.small_int}")
            print(f"Precise decimal: {record.precise_decimal}")
            print(f"JSON data: {record.json_data}")
        
    except Exception as e:
        session.rollback()
        print(f"Error with custom types: {e}")
        
    finally:
        session.close()

demonstrate_custom_types()

Connection Management

def demonstrate_connection_management():
    """Demonstrate advanced connection management with SQLAlchemy."""
    
    # Create engine with custom configuration
    engine = ydb.sqlalchemy.YdbEngineBuilder()\
        .with_endpoint("grpc://localhost:2136")\
        .with_database("/local")\
        .with_credentials(ydb.AnonymousCredentials())\
        .with_pool_size(10)\
        .build()
    
    # Connection pooling with custom settings
    engine_with_pool = sa.create_engine(
        "yql://localhost:2136/local",
        pool_size=20,
        max_overflow=30,
        pool_timeout=30,
        pool_recycle=3600,  # Recycle connections every hour
        echo=True  # Log SQL queries
    )
    
    # Use connection context managers
    with engine.connect() as connection:
        # Execute raw YQL
        result = connection.execute(sa.text("""
            SELECT COUNT(*) as user_count 
            FROM users 
            WHERE is_active = true
        """))
        
        count = result.scalar()
        print(f"Active users: {count}")
        
        # Execute with parameters
        result = connection.execute(
            sa.text("SELECT * FROM users WHERE age > :min_age"),
            {"min_age": 25}
        )
        
        for row in result:
            print(f"User: {row.name}, Age: {row.age}")
    
    # Transaction management
    with engine.begin() as connection:
        # All operations in this block are in a transaction
        connection.execute(
            sa.text("UPDATE users SET is_active = false WHERE age < :min_age"),
            {"min_age": 18}
        )
        
        connection.execute(
            sa.text("INSERT INTO audit_log (action, timestamp) VALUES (:action, :ts)"),
            {"action": "deactivated_minors", "ts": datetime.now()}
        )
        # Automatically commits on successful exit
    
    # Session with custom configuration
    SessionFactory = sessionmaker(
        bind=engine,
        expire_on_commit=False,
        autoflush=True,
        autocommit=False
    )
    
    session = SessionFactory()
    try:
        # Perform operations
        users = session.query(User).limit(10).all()
        print(f"Retrieved {len(users)} users")
        
    finally:
        session.close()

demonstrate_connection_management()

Error Handling with SQLAlchemy

def handle_sqlalchemy_errors():
    """Demonstrate error handling with SQLAlchemy and YDB."""
    
    session = Session()
    
    try:
        # Operation that might fail
        user = User(
            id=1,  # Might conflict with existing ID
            name="Test User",
            email="test@example.com"
        )
        
        session.add(user)
        session.commit()
        
    except sa.exc.IntegrityError as e:
        session.rollback()
        print(f"Integrity error (likely duplicate key): {e}")
        
    except sa.exc.OperationalError as e:
        session.rollback()
        print(f"Operational error (YDB-specific): {e}")
        
        # Check if it's a retryable YDB error
        original_error = e.orig
        if isinstance(original_error, ydb.RetryableError):
            print("This error can be retried")
        elif isinstance(original_error, ydb.BadRequestError):
            print("Bad request - fix the query")
        
    except sa.exc.DatabaseError as e:
        session.rollback()
        print(f"Database error: {e}")
        
    except Exception as e:
        session.rollback()
        print(f"Unexpected error: {e}")
        
    finally:
        session.close()
    
    # Retry logic with SQLAlchemy
    def retry_sqlalchemy_operation(operation_func, max_retries=3):
        """Retry SQLAlchemy operations with YDB error handling."""
        
        for attempt in range(max_retries):
            session = Session()
            
            try:
                result = operation_func(session)
                session.commit()
                return result
                
            except sa.exc.OperationalError as e:
                session.rollback()
                
                # Check if the underlying YDB error is retryable
                if hasattr(e, 'orig') and isinstance(e.orig, ydb.RetryableError):
                    if attempt < max_retries - 1:
                        backoff_time = 2 ** attempt
                        print(f"Retrying in {backoff_time}s (attempt {attempt + 1})")
                        time.sleep(backoff_time)
                        continue
                
                raise
                
            except Exception as e:
                session.rollback()
                raise
                
            finally:
                session.close()
        
        raise RuntimeError("Max retries exceeded")
    
    # Use retry logic
    def create_user_operation(session):
        user = User(
            id=9999,
            name="Retry Test User",
            email="retry@example.com"
        )
        session.add(user)
        return user
    
    try:
        user = retry_sqlalchemy_operation(create_user_operation)
        print(f"Created user with retry: {user}")
        
    except Exception as e:
        print(f"Failed even with retries: {e}")

handle_sqlalchemy_errors()

Performance Optimization

def optimize_sqlalchemy_performance():
    """Demonstrate performance optimization techniques."""
    
    # Bulk operations (more efficient than individual inserts)
    def bulk_insert_users(user_data_list):
        """Efficiently insert multiple users."""
        
        session = Session()
        
        try:
            # Method 1: bulk_insert_mappings (fastest)
            session.bulk_insert_mappings(User, user_data_list)
            session.commit()
            
        except Exception as e:
            session.rollback()
            print(f"Bulk insert failed: {e}")
            
        finally:
            session.close()
    
    # Efficient querying with eager loading
    def efficient_user_queries():
        """Demonstrate efficient querying patterns."""
        
        session = Session()
        
        try:
            # Use specific columns instead of SELECT *
            user_summaries = session.query(
                User.id,
                User.name,
                User.email
            ).filter(User.is_active == True).all()
            
            # Use LIMIT to avoid large result sets
            recent_users = session.query(User)\
                .order_by(User.created_at.desc())\
                .limit(100)\
                .all()
            
            # Use EXISTS for existence checks
            has_active_users = session.query(
                session.query(User)
                .filter(User.is_active == True)
                .exists()
            ).scalar()
            
            print(f"Has active users: {has_active_users}")
            
            # Batch processing for large datasets
            batch_size = 1000
            offset = 0
            
            while True:
                batch = session.query(User)\
                    .offset(offset)\
                    .limit(batch_size)\
                    .all()
                
                if not batch:
                    break
                
                # Process batch
                for user in batch:
                    # Process individual user
                    pass
                
                offset += batch_size
            
        finally:
            session.close()
    
    # Connection optimization
    def optimize_connections():
        """Optimize connection usage."""
        
        # Use connection pooling
        optimized_engine = sa.create_engine(
            "yql://localhost:2136/local",
            pool_size=20,           # Number of connections to maintain
            max_overflow=50,        # Additional connections when needed
            pool_timeout=30,        # Timeout when getting connection
            pool_recycle=3600,      # Recycle connections after 1 hour
            pool_pre_ping=True,     # Verify connections before use
            echo_pool=True          # Log pool events
        )
        
        # Use scoped sessions for thread safety
        from sqlalchemy.orm import scoped_session
        
        ScopedSession = scoped_session(sessionmaker(bind=optimized_engine))
        
        # Session will be automatically managed per thread
        session = ScopedSession()
        
        try:
            users = session.query(User).limit(10).all()
            return users
            
        finally:
            ScopedSession.remove()  # Clean up thread-local session
    
    # Generate test data
    test_users = [
        {
            "id": i,
            "name": f"User {i}",
            "email": f"user{i}@example.com",
            "age": 20 + (i % 50),
            "is_active": i % 10 != 0
        }
        for i in range(1000, 2000)
    ]
    
    # Perform bulk insert
    print("Starting bulk insert...")
    import time
    start_time = time.time()
    
    bulk_insert_users(test_users)
    
    end_time = time.time()
    print(f"Bulk insert completed in {end_time - start_time:.2f} seconds")
    
    # Run efficient queries
    efficient_user_queries()

optimize_sqlalchemy_performance()

Type Definitions

# Type aliases for SQLAlchemy integration
YdbEngine = sa.Engine
YdbConnection = sa.Connection
YdbSession = sa.orm.Session
YdbMetadata = sa.MetaData

# ORM types
YdbModel = declarative_base()
YdbColumn = sa.Column
YdbTable = sa.Table

# Query types
YdbQuery = sa.orm.Query
YdbResult = sa.engine.Result
YdbResultProxy = sa.engine.ResultProxy

# Type mapping
SqlaType = sa.types.TypeEngine
YdbType = Union[UInt8, UInt32, UInt64, YdbDecimal, YdbDateTime, YdbJson]
TypeMapping = Dict[str, SqlaType]

# Connection string format
# yql://[host]:[port]/[database]?[parameters]
ConnectionString = str

Install with Tessl CLI

npx tessl i tessl/pypi-ydb

docs

async-operations.md

authentication.md

data-types.md

dbapi-interface.md

driver-connection.md

error-handling.md

index.md

query-service.md

schema-operations.md

sqlalchemy-integration.md

table-operations.md

topic-operations.md

tile.json