Officially supported Python client for YDB distributed SQL database
npx @tessl/cli install tessl/pypi-ydb@3.21.0The official Python client for YDB (Yandex Database), a distributed SQL database. The SDK provides comprehensive functionality for table operations, query execution, transaction management, authentication, async operations, topic operations for streaming data, and integrations with SQLAlchemy and DB-API 2.0.
pip install ydbimport ydbCommon patterns for specific functionality:
# Core driver and connection
from ydb import Driver, DriverConfig
# Table operations and sessions
from ydb import SessionPool, Session, TxContext
# Data types
from ydb import types
# Async operations
import ydb.aio as ydb_aio
# Schema operations
from ydb import SchemeClient
# Query service
from ydb import QueryService
# Topic operations
from ydb import TopicClient
# Authentication
from ydb import AnonymousCredentials, StaticCredentials
import ydb.iam
# SQLAlchemy integration
import ydb.sqlalchemy
# DB-API 2.0 interface
import ydb.dbapiimport ydb
# Create driver with connection string
driver = ydb.Driver(
endpoint="grpc://localhost:2136",
database="/local",
credentials=ydb.AnonymousCredentials()
)
# Wait for driver to be ready
driver.wait(fail_fast=True, timeout=5)
# Create session pool
session_pool = ydb.SessionPool(driver)
def execute_query(session):
# Execute YQL query
result_sets = session.transaction(ydb.SerializableReadWrite()).execute(
"SELECT 1 AS value;",
commit_tx=True
)
for result_set in result_sets:
for row in result_set.rows:
print(f"Value: {row.value}")
# Execute query using session pool
session_pool.retry_operation_sync(execute_query)
# Clean up
session_pool.stop()
driver.stop()import ydb
# Create table
def create_table(session):
session.create_table(
'/local/test_table',
ydb.TableDescription()
.with_column(ydb.TableColumn('id', ydb.OptionalType(ydb.PrimitiveType.Uint64)))
.with_column(ydb.TableColumn('name', ydb.OptionalType(ydb.PrimitiveType.Utf8)))
.with_primary_key('id')
)
session_pool.retry_operation_sync(create_table)The YDB Python SDK is organized around several key components:
ydb.aio moduleCore connection functionality including driver configuration, endpoint discovery, credential management, and connection lifecycle.
class Driver:
def __init__(self, endpoint: str, database: str, credentials: Credentials = None, **kwargs): ...
def wait(self, fail_fast: bool = True, timeout: float = None) -> bool: ...
def stop(self, timeout: float = None): ...
class DriverConfig:
def __init__(self, endpoint: str, database: str, credentials: Credentials = None, **kwargs): ...
def construct_driver(config: DriverConfig) -> Driver: ...
def construct_driver_from_string(connection_string: str, **kwargs) -> Driver: ...Driver and Connection Management
Complete authentication system supporting anonymous access, static tokens, OAuth2 token exchange, JWT authentication, service account credentials, and Yandex Cloud IAM integration.
class AnonymousCredentials(Credentials): ...
class StaticCredentials(Credentials):
def __init__(self, token: str): ...
class OAuth2TokenExchangeCredentials(Credentials):
def __init__(self, token_endpoint: str, **kwargs): ...
def credentials_from_env_variables() -> Credentials: ...
def default_credentials(credentials: Credentials = None) -> Credentials: ...Authentication and Credentials
Comprehensive table operations including session management, transaction handling, query execution, table creation/modification, bulk operations, and point reads.
class SessionPool:
def __init__(self, driver: Driver, size: int = None): ...
def retry_operation_sync(self, callee: callable, *args, **kwargs): ...
def acquire(self, timeout: float = None) -> Session: ...
class Session:
def execute_query(self, query: str, parameters: dict = None, **kwargs): ...
def create_table(self, path: str, table_description: TableDescription, **kwargs): ...
def transaction(self, tx_mode: TxMode = None) -> TxContext: ...
class TxContext:
def execute(self, query: str, parameters: dict = None, commit_tx: bool = False): ...
def commit(self): ...
def rollback(self): ...Complete type system mapping Python types to YDB types, including primitives, optionals, containers, and advanced types like decimals and variants.
class PrimitiveType:
Bool: Type
Int8: Type
Uint8: Type
Int16: Type
Uint16: Type
Int32: Type
Uint32: Type
Int64: Type
Uint64: Type
Float: Type
Double: Type
String: Type
Utf8: Type
Yson: Type
Json: Type
Uuid: Type
Date: Type
Datetime: Type
Timestamp: Type
Interval: Type
def Optional(item_type: Type) -> OptionalType: ...
def List(item_type: Type) -> ListType: ...
def Tuple(*item_types: Type) -> TupleType: ...
def Struct(**kwargs: Type) -> StructType: ...
def Dict(key_type: Type, value_type: Type) -> DictType: ...Modern query interface supporting session management, transaction execution, result streaming, and advanced query options with the new Query service.
class QueryService:
def __init__(self, driver: Driver): ...
class QuerySession:
def execute_query(self, query: str, **kwargs): ...
def transaction(self, tx_settings: TxSettings = None) -> QueryTransaction: ...
class QueryTransaction:
def execute(self, query: str, parameters: dict = None): ...
def commit(self): ...
def rollback(self): ...Full async/await interface providing asynchronous versions of all core functionality through the ydb.aio module.
import ydb.aio as ydb_aio
class Driver:
async def __aenter__(self) -> 'Driver': ...
async def __aexit__(self, *args): ...
async def wait(self, fail_fast: bool = True, timeout: float = None) -> bool: ...
class SessionPool:
async def acquire(self, timeout: float = None) -> Session: ...
async def retry_operation(self, callee: callable, *args, **kwargs): ...
class Session:
async def execute_query(self, query: str, parameters: dict = None, **kwargs): ...
async def create_table(self, path: str, table_description: TableDescription, **kwargs): ...Database schema management including directory operations, table schema inspection, permissions management, and metadata operations.
class SchemeClient:
def __init__(self, driver: Driver): ...
def make_directory(self, path: str, **kwargs): ...
def remove_directory(self, path: str, **kwargs): ...
def list_directory(self, path: str, **kwargs): ...
def describe_path(self, path: str, **kwargs): ...
class SchemeEntry:
name: str
type: SchemeEntryType
is_directory: bool
is_table: bool
class TableDescription:
def with_column(self, column: TableColumn) -> 'TableDescription': ...
def with_primary_key(self, *key_names: str) -> 'TableDescription': ...Streaming data operations including topic creation, message publishing, message consuming, and topic administration.
class TopicClient:
def __init__(self, driver: Driver): ...
class TopicWriter:
def __init__(self, driver: Driver, topic_path: str, **kwargs): ...
def write(self, messages: List[ProducerMessage]): ...
def flush(self): ...
class TopicReader:
def __init__(self, driver: Driver, **kwargs): ...
def receive_message(self) -> Message: ...
def commit(self, message: Message): ...
class ProducerMessage:
def __init__(self, data: bytes, **kwargs): ...Comprehensive error handling with detailed error hierarchies, retry strategies, backoff configurations, and operation result processing.
class Error(Exception):
status: int
message: str
issues: List[Issue]
class RetryableError(Error): ...
class BadRequestError(Error): ...
class UnauthorizedError(Error): ...
class NotFoundError(Error): ...
class AlreadyExistsError(Error): ...
class RetrySettings:
def __init__(self, max_retries: int = 10, **kwargs): ...
class BackoffSettings:
def __init__(self, max_backoff: float = 32.0, **kwargs): ...
def retry_operation_sync(callee: callable, retry_settings: RetrySettings = None, *args, **kwargs): ...SQLAlchemy dialect for YDB enabling ORM usage, custom YDB types, connection management, and standard SQLAlchemy operations.
import ydb.sqlalchemy
def register_dialect(): ...
class UInt32(Integer): ...
class UInt64(Integer): ...
class UInt8(Integer): ...
# Connection string format
# ydb://endpoint/database?param=valueStandard Python DB-API 2.0 compliant interface providing familiar database connectivity patterns.
import ydb.dbapi
def connect(endpoint: str, database: str, **kwargs) -> Connection: ...
class Connection:
def cursor(self) -> Cursor: ...
def commit(self): ...
def rollback(self): ...
def close(self): ...
class Cursor:
def execute(self, query: str, parameters: tuple = None): ...
def executemany(self, query: str, seq_of_parameters: List[tuple]): ...
def fetchone(self) -> tuple: ...
def fetchall(self) -> List[tuple]: ...
def fetchmany(self, size: int = None) -> List[tuple]: ...YQL script execution and management for running complex analytical queries and operations outside of table service.
class ScriptingClient:
def __init__(self, driver: Driver): ...
def execute_yql_script(self, yql_script: str, **kwargs): ...
def explain_yql_script(self, yql_script: str, **kwargs): ...
class YqlQueryResult:
result_sets: List[ResultSet]
status: int
class YqlExplainResult:
plan: str
ast: strManagement of long-running operations including export/import operations and other asynchronous tasks.
class Operation:
def __init__(self, operation_id: str, driver: Driver): ...
def cancel(self): ...
def forget(self): ...
def get(self): ...
class OperationClient:
def __init__(self, driver: Driver): ...
def cancel_operation(self, operation: Operation): ...
def forget_operation(self, operation: Operation): ...
def get_operation(self, operation: Operation): ...Data export to external systems (S3, Yandex Object Storage) and import from external sources.
class ExportToS3Settings:
def __init__(self, endpoint: str, bucket: str, **kwargs): ...
class ExportToS3Operation(Operation):
def __init__(self, driver: Driver, source_path: str, settings: ExportToS3Settings): ...
class ImportFromS3Settings:
def __init__(self, endpoint: str, bucket: str, **kwargs): ...
class ImportFromS3Operation(Operation):
def __init__(self, driver: Driver, destination_path: str, settings: ImportFromS3Settings): ...
class ExportProgress:
UNSPECIFIED: int
PREPARING: int
TRANSFER_DATA: int
DONE: int
CANCELLATION: int
CANCELLED: int
class ImportProgress:
UNSPECIFIED: int
PREPARING: int
TRANSFER_DATA: int
BUILD_INDEXES: int
DONE: int
CANCELLATION: int
CANCELLED: intConfigurable retry logic and backoff strategies for handling transient failures.
class RetrySettings:
def __init__(self, max_retries: int = 10, **kwargs): ...
class BackoffSettings:
def __init__(self, max_backoff: float = 32.0, **kwargs): ...
def retry_operation_sync(callee: callable, retry_settings: RetrySettings = None, *args, **kwargs): ...
def retry_operation_async(callee: callable, retry_settings: RetrySettings = None, *args, **kwargs): ...Distributed tracing integration for monitoring and debugging database operations.
class Tracer:
def __init__(self, **kwargs): ...
class TraceLevel:
OFF: int
ERROR: int
WARN: int
INFO: int
DEBUG: int
TRACE: intGlobal configuration options affecting SDK behavior across all operations.
def global_allow_truncated_result(enabled: bool): ...
def global_allow_split_transactions(enabled: bool): ...# Transaction modes
class TxMode:
SERIALIZABLE_RW: TxMode
ONLINE_RO: TxMode
STALE_RO: TxMode
SNAPSHOT_RO: TxMode
# Time units
class Unit:
SECONDS: Unit
MILLISECONDS: Unit
MICROSECONDS: Unit
NANOSECONDS: Unit
# Schema entry types
class SchemeEntryType:
DIRECTORY: SchemeEntryType
TABLE: SchemeEntryType
TOPIC: SchemeEntryType
DATABASE: SchemeEntryType
# RPC compression options
class RPCCompression:
NoCompression: int
Deflate: int
Gzip: int