Officially supported Python client for YDB distributed SQL database
SQLAlchemy dialect for YDB enabling ORM usage, custom YDB types, connection management, and standard SQLAlchemy operations.
The YDB SQLAlchemy dialect enables using YDB with SQLAlchemy ORM and core functionality.
import ydb.sqlalchemy
class YqlDialect(DefaultDialect):
"""
SQLAlchemy dialect for YDB (YQL).
Provides SQLAlchemy integration for YDB database operations
including ORM support, query compilation, and type mapping.
"""
name = "yql"
supports_alter = False
max_identifier_length = 63
supports_sane_rowcount = False
supports_statement_cache = False
supports_native_enum = False
supports_native_boolean = True
supports_smallserial = False
supports_sequences = False
sequences_optional = True
preexecute_autoincrement_sequences = True
postfetch_lastrowid = False
supports_default_values = False
supports_empty_insert = False
supports_multivalues_insert = True
default_paramstyle = "qmark"
isolation_level = None
@staticmethod
def dbapi():
"""
Get YDB DB-API module.
Returns:
module: YDB DB-API 2.0 module
"""
import ydb.dbapi
return ydb.dbapi
def get_columns(
self,
connection,
table_name: str,
schema: str = None,
**kwargs
) -> List[Dict]:
"""
Get table column information for SQLAlchemy reflection.
Args:
connection: SQLAlchemy connection
table_name (str): Table name
schema (str, optional): Schema name (not supported)
**kwargs: Additional arguments
Returns:
List[Dict]: Column information dictionaries
"""
def has_table(
self,
connection,
table_name: str,
schema: str = None
) -> bool:
"""
Check if table exists.
Args:
connection: SQLAlchemy connection
table_name (str): Table name to check
schema (str, optional): Schema name (not supported)
Returns:
bool: True if table exists
"""
def register_dialect(
name: str = "yql",
module: str = None,
cls: str = "YqlDialect"
):
"""
Register YDB SQLAlchemy dialect.
Args:
name (str): Dialect name
module (str, optional): Module path
cls (str): Dialect class name
"""Custom SQLAlchemy types for YDB-specific data types.
from sqlalchemy.types import Integer, TypeDecorator
class UInt8(Integer):
"""
SQLAlchemy type for YDB UInt8.
Maps to YDB's 8-bit unsigned integer type.
"""
def __init__(self):
"""Create UInt8 type."""
super().__init__()
class UInt32(Integer):
"""
SQLAlchemy type for YDB UInt32.
Maps to YDB's 32-bit unsigned integer type.
"""
def __init__(self):
"""Create UInt32 type."""
super().__init__()
class UInt64(Integer):
"""
SQLAlchemy type for YDB UInt64.
Maps to YDB's 64-bit unsigned integer type.
"""
def __init__(self):
"""Create UInt64 type."""
super().__init__()
class YdbDateTime(TypeDecorator):
"""
SQLAlchemy type decorator for YDB datetime handling.
Handles conversion between Python datetime and YDB datetime types.
"""
impl = sa.DATETIME
cache_ok = True
class YdbDecimal(TypeDecorator):
"""
SQLAlchemy type decorator for YDB decimal type.
Handles YDB decimal precision and scale requirements.
"""
impl = sa.DECIMAL
cache_ok = True
def __init__(self, precision: int = 22, scale: int = 9):
"""
Create YDB decimal type.
Args:
precision (int): Decimal precision (max 35)
scale (int): Decimal scale (max precision)
"""
super().__init__(precision=precision, scale=scale)
class YdbJson(TypeDecorator):
"""
SQLAlchemy type decorator for YDB JSON types.
Handles both Json and JsonDocument YDB types.
"""
impl = sa.JSON
cache_ok = TrueCustom type compiler for translating SQLAlchemy types to YQL types.
class YqlTypeCompiler(GenericTypeCompiler):
"""
Type compiler for converting SQLAlchemy types to YQL.
"""
def visit_VARCHAR(self, type_, **kwargs) -> str:
"""
Convert VARCHAR to YQL STRING type.
Args:
type_: SQLAlchemy VARCHAR type
**kwargs: Additional arguments
Returns:
str: YQL type string
"""
return "STRING"
def visit_unicode(self, type_, **kwargs) -> str:
"""
Convert Unicode to YQL UTF8 type.
Args:
type_: SQLAlchemy Unicode type
**kwargs: Additional arguments
Returns:
str: YQL type string
"""
return "UTF8"
def visit_TEXT(self, type_, **kwargs) -> str:
"""
Convert TEXT to YQL UTF8 type.
Args:
type_: SQLAlchemy TEXT type
**kwargs: Additional arguments
Returns:
str: YQL type string
"""
return "UTF8"
def visit_BOOLEAN(self, type_, **kwargs) -> str:
"""
Convert BOOLEAN to YQL BOOL type.
Args:
type_: SQLAlchemy BOOLEAN type
**kwargs: Additional arguments
Returns:
str: YQL type string
"""
return "BOOL"
def visit_FLOAT(self, type_, **kwargs) -> str:
"""
Convert FLOAT to YQL DOUBLE type.
Args:
type_: SQLAlchemy FLOAT type
**kwargs: Additional arguments
Returns:
str: YQL type string
"""
return "DOUBLE"
def visit_uint32(self, type_, **kwargs) -> str:
"""
Convert UInt32 to YQL UInt32 type.
Args:
type_: YDB UInt32 type
**kwargs: Additional arguments
Returns:
str: YQL type string
"""
return "UInt32"
def visit_uint64(self, type_, **kwargs) -> str:
"""
Convert UInt64 to YQL UInt64 type.
Args:
type_: YDB UInt64 type
**kwargs: Additional arguments
Returns:
str: YQL type string
"""
return "UInt64"
def visit_uint8(self, type_, **kwargs) -> str:
"""
Convert UInt8 to YQL UInt8 type.
Args:
type_: YDB UInt8 type
**kwargs: Additional arguments
Returns:
str: YQL type string
"""
return "UInt8"Custom SQL compiler for generating YQL from SQLAlchemy expressions.
class YqlCompiler(SQLCompiler):
"""
SQL compiler for generating YQL queries from SQLAlchemy expressions.
"""
def group_by_clause(self, select, **kwargs):
"""
Generate GROUP BY clause for YQL.
Args:
select: SQLAlchemy select object
**kwargs: Additional arguments
Returns:
str: YQL GROUP BY clause
"""
kwargs.update(within_columns_clause=True)
return super().group_by_clause(select, **kwargs)
def visit_function(self, func, add_to_result_map=None, **kwargs):
"""
Visit function expressions and convert to YQL syntax.
Args:
func: SQLAlchemy function expression
add_to_result_map: Result map callback
**kwargs: Additional arguments
Returns:
str: YQL function call
"""
# Handle YQL namespace syntax (::) instead of SQL (.)
disp = getattr(self, f"visit_{func.name.lower()}_func", None)
if disp:
return disp(func, **kwargs)
# Convert function names to YQL format
name = func.name
if hasattr(func, 'packagenames') and func.packagenames:
name = "::".join(func.packagenames + [name])
return f"{name}{self.function_argspec(func, **kwargs)}"
def visit_lambda(self, lambda_, **kwargs):
"""
Visit lambda expressions for YQL.
Args:
lambda_: Lambda expression
**kwargs: Additional arguments
Returns:
str: YQL lambda expression
"""
func = lambda_.func
spec = inspect_getfullargspec(func)
# Build YQL lambda syntax: ($arg1, $arg2) -> { RETURN expression; }
args_str = "(" + ", ".join(f"${arg}" for arg in spec.args) + ")"
# Create literal columns for lambda parameters
args = [literal_column(f"${arg}") for arg in spec.args]
body_expr = func(*args)
body_str = self.process(body_expr, **kwargs)
return f"{args_str} -> {{ RETURN {body_str}; }}"
class ParametrizedFunction(functions.Function):
"""
SQLAlchemy function with YQL-style parameters.
Supports YQL functions that take type parameters.
"""
__visit_name__ = "parametrized_function"
def __init__(self, name: str, params: List, *args, **kwargs):
"""
Create parametrized function.
Args:
name (str): Function name
params (List): Type parameters
*args: Function arguments
**kwargs: Additional arguments
"""
super().__init__(name, *args, **kwargs)
self._func_name = name
self._func_params = params
self.params_expr = ClauseList(
operator=functions.operators.comma_op,
group_contents=True,
*params
).self_group()Custom identifier handling for YQL naming conventions.
class YqlIdentifierPreparer(IdentifierPreparer):
"""
Identifier preparer for YQL naming conventions.
Handles quoting and escaping of identifiers in YQL.
"""
def __init__(self, dialect):
"""
Create YQL identifier preparer.
Args:
dialect: YQL dialect instance
"""
super().__init__(
dialect,
initial_quote="`",
final_quote="`",
)
def _requires_quotes(self, value: str) -> bool:
"""
Determine if identifier requires quoting.
Args:
value (str): Identifier value
Returns:
bool: True if quoting is required
"""
# Force quoting unless already quoted
return not (
value.startswith(self.initial_quote) and
value.endswith(self.final_quote)
)
def quote_identifier(self, value: str) -> str:
"""
Quote identifier for YQL.
Args:
value (str): Identifier to quote
Returns:
str: Quoted identifier
"""
if self._requires_quotes(value):
return f"{self.initial_quote}{value}{self.final_quote}"
return valueUtilities for creating SQLAlchemy engines and connections to YDB.
def create_ydb_engine(
endpoint: str,
database: str,
credentials: ydb.Credentials = None,
**engine_kwargs
) -> sa.Engine:
"""
Create SQLAlchemy engine for YDB.
Args:
endpoint (str): YDB endpoint URL
database (str): Database path
credentials (ydb.Credentials, optional): Authentication credentials
**engine_kwargs: Additional engine arguments
Returns:
sa.Engine: SQLAlchemy engine configured for YDB
"""
# Register YDB dialect
register_dialect()
# Build connection string
connection_string = f"yql://{endpoint.replace('grpc://', '')}{database}"
# Create engine with YDB-specific settings
engine = sa.create_engine(
connection_string,
credentials=credentials,
**engine_kwargs
)
return engine
def get_ydb_metadata(engine: sa.Engine) -> sa.MetaData:
"""
Get SQLAlchemy metadata with YDB table reflection.
Args:
engine (sa.Engine): YDB SQLAlchemy engine
Returns:
sa.MetaData: Metadata with reflected tables
"""
metadata = sa.MetaData()
metadata.reflect(bind=engine)
return metadata
class YdbEngineBuilder:
"""
Builder for YDB SQLAlchemy engines with configuration.
"""
def __init__(self):
"""Create engine builder."""
self.endpoint = None
self.database = None
self.credentials = None
self.engine_options = {}
def with_endpoint(self, endpoint: str) -> 'YdbEngineBuilder':
"""
Set YDB endpoint.
Args:
endpoint (str): YDB endpoint URL
Returns:
YdbEngineBuilder: Self for chaining
"""
self.endpoint = endpoint
return self
def with_database(self, database: str) -> 'YdbEngineBuilder':
"""
Set database path.
Args:
database (str): Database path
Returns:
YdbEngineBuilder: Self for chaining
"""
self.database = database
return self
def with_credentials(self, credentials: ydb.Credentials) -> 'YdbEngineBuilder':
"""
Set authentication credentials.
Args:
credentials (ydb.Credentials): YDB credentials
Returns:
YdbEngineBuilder: Self for chaining
"""
self.credentials = credentials
return self
def with_pool_size(self, size: int) -> 'YdbEngineBuilder':
"""
Set connection pool size.
Args:
size (int): Pool size
Returns:
YdbEngineBuilder: Self for chaining
"""
self.engine_options['pool_size'] = size
return self
def build(self) -> sa.Engine:
"""
Build SQLAlchemy engine.
Returns:
sa.Engine: Configured YDB engine
"""
if not self.endpoint or not self.database:
raise ValueError("Endpoint and database are required")
return create_ydb_engine(
self.endpoint,
self.database,
self.credentials,
**self.engine_options
)import sqlalchemy as sa
import ydb.sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# Register YDB dialect
ydb.sqlalchemy.register_dialect()
# Create engine
engine = sa.create_engine("yql://localhost:2136/local")
# Create metadata and base class
Base = declarative_base()
metadata = sa.MetaData()
# Define ORM model
class User(Base):
__tablename__ = 'users'
id = sa.Column(ydb.sqlalchemy.UInt64, primary_key=True)
name = sa.Column(sa.Unicode(255), nullable=False)
email = sa.Column(sa.Unicode(255), unique=True)
age = sa.Column(ydb.sqlalchemy.UInt32)
created_at = sa.Column(sa.DateTime, default=sa.func.CurrentUtcDatetime())
is_active = sa.Column(sa.Boolean, default=True)
metadata_json = sa.Column(sa.JSON)
def __repr__(self):
return f"<User(id={self.id}, name='{self.name}', email='{self.email}')>"
# Create session factory
Session = sessionmaker(bind=engine)def demonstrate_orm_operations():
"""Demonstrate basic ORM operations with YDB."""
session = Session()
try:
# Create new user
new_user = User(
id=1001,
name="Alice Johnson",
email="alice@example.com",
age=25,
metadata_json={"department": "engineering", "level": "senior"}
)
session.add(new_user)
session.commit()
# Query users
users = session.query(User).filter(User.age > 21).all()
print(f"Found {len(users)} users over 21")
# Update user
user = session.query(User).filter(User.email == "alice@example.com").first()
if user:
user.age = 26
user.metadata_json = {"department": "engineering", "level": "lead"}
session.commit()
print(f"Updated user: {user}")
# Aggregate query
avg_age = session.query(sa.func.avg(User.age)).scalar()
print(f"Average age: {avg_age}")
# Complex query with joins and filters
active_engineers = session.query(User).filter(
User.is_active == True,
User.metadata_json['department'].astext == 'engineering'
).order_by(User.created_at.desc()).all()
print(f"Active engineers: {len(active_engineers)}")
except Exception as e:
session.rollback()
print(f"Error in ORM operations: {e}")
finally:
session.close()
demonstrate_orm_operations()def demonstrate_yql_features():
"""Demonstrate YQL-specific features through SQLAlchemy."""
session = Session()
try:
# Use YQL functions through SQLAlchemy
query = session.query(
User.name,
sa.func.ListLength(
sa.func.String_SplitToList(User.name, sa.literal(" "))
).label("name_parts_count")
).filter(User.is_active == True)
for user_name, parts_count in query:
print(f"User: {user_name}, Name parts: {parts_count}")
# YQL lambda expressions (requires custom compilation)
# This would need additional dialect customization
# Window functions
ranked_users = session.query(
User.name,
User.age,
sa.func.row_number().over(
order_by=User.age.desc()
).label("age_rank")
).all()
for name, age, rank in ranked_users:
print(f"Rank {rank}: {name} (age {age})")
# JSON operations
engineering_users = session.query(User).filter(
sa.func.JsonValue(
User.metadata_json,
'strict $.department'
) == 'engineering'
).all()
print(f"Engineering users: {len(engineering_users)}")
except Exception as e:
session.rollback()
print(f"Error in YQL operations: {e}")
finally:
session.close()
demonstrate_yql_features()def inspect_ydb_schema():
"""Inspect YDB schema through SQLAlchemy reflection."""
# Reflect existing tables
metadata = sa.MetaData()
metadata.reflect(bind=engine)
print("Reflected tables:")
for table_name, table in metadata.tables.items():
print(f"\nTable: {table_name}")
print("Columns:")
for column in table.columns:
nullable = "NULL" if column.nullable else "NOT NULL"
primary = "PRIMARY KEY" if column.primary_key else ""
print(f" {column.name}: {column.type} {nullable} {primary}")
# Print indexes if any
if table.indexes:
print("Indexes:")
for index in table.indexes:
unique = "UNIQUE" if index.unique else ""
columns = ", ".join([col.name for col in index.columns])
print(f" {index.name}: {unique} ({columns})")
# Get table info using engine
inspector = sa.inspect(engine)
print("\nInspected table names:")
table_names = inspector.get_table_names()
for name in table_names:
print(f" {name}")
# Get column info
columns = inspector.get_columns(name)
for col in columns:
print(f" {col['name']}: {col['type']} (nullable: {col['nullable']})")
inspect_ydb_schema()from decimal import Decimal
from datetime import datetime
def demonstrate_custom_types():
"""Demonstrate YDB-specific SQLAlchemy types."""
# Define table with YDB-specific types
class YdbTypeDemo(Base):
__tablename__ = 'ydb_type_demo'
id = sa.Column(ydb.sqlalchemy.UInt64, primary_key=True)
small_int = sa.Column(ydb.sqlalchemy.UInt8)
medium_int = sa.Column(ydb.sqlalchemy.UInt32)
large_int = sa.Column(ydb.sqlalchemy.UInt64)
precise_decimal = sa.Column(ydb.sqlalchemy.YdbDecimal(precision=22, scale=9))
timestamp_field = sa.Column(ydb.sqlalchemy.YdbDateTime)
json_data = sa.Column(ydb.sqlalchemy.YdbJson)
# Create table (would need to be done outside SQLAlchemy for YDB)
# Base.metadata.create_all(engine) # Not supported in YDB
session = Session()
try:
# Insert with YDB types
demo_record = YdbTypeDemo(
id=1,
small_int=255, # Max UInt8
medium_int=4294967295, # Max UInt32
large_int=18446744073709551615, # Max UInt64
precise_decimal=Decimal('123456789.123456789'),
timestamp_field=datetime.now(),
json_data={
"nested": {"key": "value"},
"array": [1, 2, 3],
"boolean": True
}
)
session.add(demo_record)
session.commit()
# Query with type-specific operations
results = session.query(YdbTypeDemo).filter(
YdbTypeDemo.small_int > 100,
YdbTypeDemo.precise_decimal > Decimal('100')
).all()
for record in results:
print(f"ID: {record.id}")
print(f"Small int: {record.small_int}")
print(f"Precise decimal: {record.precise_decimal}")
print(f"JSON data: {record.json_data}")
except Exception as e:
session.rollback()
print(f"Error with custom types: {e}")
finally:
session.close()
demonstrate_custom_types()def demonstrate_connection_management():
"""Demonstrate advanced connection management with SQLAlchemy."""
# Create engine with custom configuration
engine = ydb.sqlalchemy.YdbEngineBuilder()\
.with_endpoint("grpc://localhost:2136")\
.with_database("/local")\
.with_credentials(ydb.AnonymousCredentials())\
.with_pool_size(10)\
.build()
# Connection pooling with custom settings
engine_with_pool = sa.create_engine(
"yql://localhost:2136/local",
pool_size=20,
max_overflow=30,
pool_timeout=30,
pool_recycle=3600, # Recycle connections every hour
echo=True # Log SQL queries
)
# Use connection context managers
with engine.connect() as connection:
# Execute raw YQL
result = connection.execute(sa.text("""
SELECT COUNT(*) as user_count
FROM users
WHERE is_active = true
"""))
count = result.scalar()
print(f"Active users: {count}")
# Execute with parameters
result = connection.execute(
sa.text("SELECT * FROM users WHERE age > :min_age"),
{"min_age": 25}
)
for row in result:
print(f"User: {row.name}, Age: {row.age}")
# Transaction management
with engine.begin() as connection:
# All operations in this block are in a transaction
connection.execute(
sa.text("UPDATE users SET is_active = false WHERE age < :min_age"),
{"min_age": 18}
)
connection.execute(
sa.text("INSERT INTO audit_log (action, timestamp) VALUES (:action, :ts)"),
{"action": "deactivated_minors", "ts": datetime.now()}
)
# Automatically commits on successful exit
# Session with custom configuration
SessionFactory = sessionmaker(
bind=engine,
expire_on_commit=False,
autoflush=True,
autocommit=False
)
session = SessionFactory()
try:
# Perform operations
users = session.query(User).limit(10).all()
print(f"Retrieved {len(users)} users")
finally:
session.close()
demonstrate_connection_management()def handle_sqlalchemy_errors():
"""Demonstrate error handling with SQLAlchemy and YDB."""
session = Session()
try:
# Operation that might fail
user = User(
id=1, # Might conflict with existing ID
name="Test User",
email="test@example.com"
)
session.add(user)
session.commit()
except sa.exc.IntegrityError as e:
session.rollback()
print(f"Integrity error (likely duplicate key): {e}")
except sa.exc.OperationalError as e:
session.rollback()
print(f"Operational error (YDB-specific): {e}")
# Check if it's a retryable YDB error
original_error = e.orig
if isinstance(original_error, ydb.RetryableError):
print("This error can be retried")
elif isinstance(original_error, ydb.BadRequestError):
print("Bad request - fix the query")
except sa.exc.DatabaseError as e:
session.rollback()
print(f"Database error: {e}")
except Exception as e:
session.rollback()
print(f"Unexpected error: {e}")
finally:
session.close()
# Retry logic with SQLAlchemy
def retry_sqlalchemy_operation(operation_func, max_retries=3):
"""Retry SQLAlchemy operations with YDB error handling."""
for attempt in range(max_retries):
session = Session()
try:
result = operation_func(session)
session.commit()
return result
except sa.exc.OperationalError as e:
session.rollback()
# Check if the underlying YDB error is retryable
if hasattr(e, 'orig') and isinstance(e.orig, ydb.RetryableError):
if attempt < max_retries - 1:
backoff_time = 2 ** attempt
print(f"Retrying in {backoff_time}s (attempt {attempt + 1})")
time.sleep(backoff_time)
continue
raise
except Exception as e:
session.rollback()
raise
finally:
session.close()
raise RuntimeError("Max retries exceeded")
# Use retry logic
def create_user_operation(session):
user = User(
id=9999,
name="Retry Test User",
email="retry@example.com"
)
session.add(user)
return user
try:
user = retry_sqlalchemy_operation(create_user_operation)
print(f"Created user with retry: {user}")
except Exception as e:
print(f"Failed even with retries: {e}")
handle_sqlalchemy_errors()def optimize_sqlalchemy_performance():
"""Demonstrate performance optimization techniques."""
# Bulk operations (more efficient than individual inserts)
def bulk_insert_users(user_data_list):
"""Efficiently insert multiple users."""
session = Session()
try:
# Method 1: bulk_insert_mappings (fastest)
session.bulk_insert_mappings(User, user_data_list)
session.commit()
except Exception as e:
session.rollback()
print(f"Bulk insert failed: {e}")
finally:
session.close()
# Efficient querying with eager loading
def efficient_user_queries():
"""Demonstrate efficient querying patterns."""
session = Session()
try:
# Use specific columns instead of SELECT *
user_summaries = session.query(
User.id,
User.name,
User.email
).filter(User.is_active == True).all()
# Use LIMIT to avoid large result sets
recent_users = session.query(User)\
.order_by(User.created_at.desc())\
.limit(100)\
.all()
# Use EXISTS for existence checks
has_active_users = session.query(
session.query(User)
.filter(User.is_active == True)
.exists()
).scalar()
print(f"Has active users: {has_active_users}")
# Batch processing for large datasets
batch_size = 1000
offset = 0
while True:
batch = session.query(User)\
.offset(offset)\
.limit(batch_size)\
.all()
if not batch:
break
# Process batch
for user in batch:
# Process individual user
pass
offset += batch_size
finally:
session.close()
# Connection optimization
def optimize_connections():
"""Optimize connection usage."""
# Use connection pooling
optimized_engine = sa.create_engine(
"yql://localhost:2136/local",
pool_size=20, # Number of connections to maintain
max_overflow=50, # Additional connections when needed
pool_timeout=30, # Timeout when getting connection
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True, # Verify connections before use
echo_pool=True # Log pool events
)
# Use scoped sessions for thread safety
from sqlalchemy.orm import scoped_session
ScopedSession = scoped_session(sessionmaker(bind=optimized_engine))
# Session will be automatically managed per thread
session = ScopedSession()
try:
users = session.query(User).limit(10).all()
return users
finally:
ScopedSession.remove() # Clean up thread-local session
# Generate test data
test_users = [
{
"id": i,
"name": f"User {i}",
"email": f"user{i}@example.com",
"age": 20 + (i % 50),
"is_active": i % 10 != 0
}
for i in range(1000, 2000)
]
# Perform bulk insert
print("Starting bulk insert...")
import time
start_time = time.time()
bulk_insert_users(test_users)
end_time = time.time()
print(f"Bulk insert completed in {end_time - start_time:.2f} seconds")
# Run efficient queries
efficient_user_queries()
optimize_sqlalchemy_performance()# Type aliases for SQLAlchemy integration
YdbEngine = sa.Engine
YdbConnection = sa.Connection
YdbSession = sa.orm.Session
YdbMetadata = sa.MetaData
# ORM types
YdbModel = declarative_base()
YdbColumn = sa.Column
YdbTable = sa.Table
# Query types
YdbQuery = sa.orm.Query
YdbResult = sa.engine.Result
YdbResultProxy = sa.engine.ResultProxy
# Type mapping
SqlaType = sa.types.TypeEngine
YdbType = Union[UInt8, UInt32, UInt64, YdbDecimal, YdbDateTime, YdbJson]
TypeMapping = Dict[str, SqlaType]
# Connection string format
# yql://[host]:[port]/[database]?[parameters]
ConnectionString = str