or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-operations.mdauthentication.mddata-types.mddbapi-interface.mddriver-connection.mderror-handling.mdindex.mdquery-service.mdschema-operations.mdsqlalchemy-integration.mdtable-operations.mdtopic-operations.md
tile.json

tessl/pypi-ydb

Officially supported Python client for YDB distributed SQL database

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/ydb@3.21.x

To install, run

npx @tessl/cli install tessl/pypi-ydb@3.21.0

index.mddocs/

YDB Python SDK

The official Python client for YDB (Yandex Database), a distributed SQL database. The SDK provides comprehensive functionality for table operations, query execution, transaction management, authentication, async operations, topic operations for streaming data, and integrations with SQLAlchemy and DB-API 2.0.

Package Information

  • Package Name: ydb
  • Language: Python
  • Installation: pip install ydb
  • Repository: https://github.com/ydb-platform/ydb-python-sdk
  • Documentation: https://ydb-platform.github.io/ydb-python-sdk

Core Imports

import ydb

Common patterns for specific functionality:

# Core driver and connection
from ydb import Driver, DriverConfig

# Table operations and sessions
from ydb import SessionPool, Session, TxContext

# Data types
from ydb import types

# Async operations
import ydb.aio as ydb_aio

# Schema operations
from ydb import SchemeClient

# Query service
from ydb import QueryService

# Topic operations
from ydb import TopicClient

# Authentication
from ydb import AnonymousCredentials, StaticCredentials
import ydb.iam

# SQLAlchemy integration
import ydb.sqlalchemy

# DB-API 2.0 interface
import ydb.dbapi

Basic Usage

Simple Connection and Query

import ydb

# Create driver with connection string
driver = ydb.Driver(
    endpoint="grpc://localhost:2136",
    database="/local",
    credentials=ydb.AnonymousCredentials()
)

# Wait for driver to be ready
driver.wait(fail_fast=True, timeout=5)

# Create session pool
session_pool = ydb.SessionPool(driver)

def execute_query(session):
    # Execute YQL query
    result_sets = session.transaction(ydb.SerializableReadWrite()).execute(
        "SELECT 1 AS value;",
        commit_tx=True
    )
    
    for result_set in result_sets:
        for row in result_set.rows:
            print(f"Value: {row.value}")

# Execute query using session pool
session_pool.retry_operation_sync(execute_query)

# Clean up
session_pool.stop()
driver.stop()

Table Operations

import ydb

# Create table
def create_table(session):
    session.create_table(
        '/local/test_table',
        ydb.TableDescription()
        .with_column(ydb.TableColumn('id', ydb.OptionalType(ydb.PrimitiveType.Uint64)))
        .with_column(ydb.TableColumn('name', ydb.OptionalType(ydb.PrimitiveType.Utf8)))
        .with_primary_key('id')
    )

session_pool.retry_operation_sync(create_table)

Architecture

The YDB Python SDK is organized around several key components:

  • Driver: Manages connections to YDB cluster and handles endpoint discovery
  • Session Pool: Manages database sessions with automatic retry and failover
  • Query Execution: Table service for traditional operations and Query service for modern SQL
  • Type System: Complete type mapping between Python and YDB data types
  • Authentication: Multiple credential providers including OAuth2, JWT, and service accounts
  • Async Support: Full async/await interface through ydb.aio module
  • Integrations: SQLAlchemy dialect and DB-API 2.0 compliant interface

Capabilities

Driver and Connection Management

Core connection functionality including driver configuration, endpoint discovery, credential management, and connection lifecycle.

class Driver:
    def __init__(self, endpoint: str, database: str, credentials: Credentials = None, **kwargs): ...
    def wait(self, fail_fast: bool = True, timeout: float = None) -> bool: ...
    def stop(self, timeout: float = None): ...

class DriverConfig:
    def __init__(self, endpoint: str, database: str, credentials: Credentials = None, **kwargs): ...

def construct_driver(config: DriverConfig) -> Driver: ...
def construct_driver_from_string(connection_string: str, **kwargs) -> Driver: ...

Driver and Connection Management

Authentication and Credentials

Complete authentication system supporting anonymous access, static tokens, OAuth2 token exchange, JWT authentication, service account credentials, and Yandex Cloud IAM integration.

class AnonymousCredentials(Credentials): ...

class StaticCredentials(Credentials):
    def __init__(self, token: str): ...

class OAuth2TokenExchangeCredentials(Credentials):
    def __init__(self, token_endpoint: str, **kwargs): ...

def credentials_from_env_variables() -> Credentials: ...
def default_credentials(credentials: Credentials = None) -> Credentials: ...

Authentication and Credentials

Table Operations and Sessions

Comprehensive table operations including session management, transaction handling, query execution, table creation/modification, bulk operations, and point reads.

class SessionPool:
    def __init__(self, driver: Driver, size: int = None): ...
    def retry_operation_sync(self, callee: callable, *args, **kwargs): ...
    def acquire(self, timeout: float = None) -> Session: ...

class Session:
    def execute_query(self, query: str, parameters: dict = None, **kwargs): ...
    def create_table(self, path: str, table_description: TableDescription, **kwargs): ...
    def transaction(self, tx_mode: TxMode = None) -> TxContext: ...

class TxContext:
    def execute(self, query: str, parameters: dict = None, commit_tx: bool = False): ...
    def commit(self): ...
    def rollback(self): ...

Table Operations and Sessions

Data Types and Conversion

Complete type system mapping Python types to YDB types, including primitives, optionals, containers, and advanced types like decimals and variants.

class PrimitiveType:
    Bool: Type
    Int8: Type
    Uint8: Type
    Int16: Type
    Uint16: Type
    Int32: Type
    Uint32: Type
    Int64: Type
    Uint64: Type
    Float: Type
    Double: Type
    String: Type
    Utf8: Type
    Yson: Type
    Json: Type
    Uuid: Type
    Date: Type
    Datetime: Type
    Timestamp: Type
    Interval: Type

def Optional(item_type: Type) -> OptionalType: ...
def List(item_type: Type) -> ListType: ...
def Tuple(*item_types: Type) -> TupleType: ...
def Struct(**kwargs: Type) -> StructType: ...
def Dict(key_type: Type, value_type: Type) -> DictType: ...

Data Types and Conversion

Query Service Operations

Modern query interface supporting session management, transaction execution, result streaming, and advanced query options with the new Query service.

class QueryService:
    def __init__(self, driver: Driver): ...

class QuerySession:
    def execute_query(self, query: str, **kwargs): ...
    def transaction(self, tx_settings: TxSettings = None) -> QueryTransaction: ...

class QueryTransaction:
    def execute(self, query: str, parameters: dict = None): ...
    def commit(self): ...
    def rollback(self): ...

Query Service Operations

Async Operations

Full async/await interface providing asynchronous versions of all core functionality through the ydb.aio module.

import ydb.aio as ydb_aio

class Driver:
    async def __aenter__(self) -> 'Driver': ...
    async def __aexit__(self, *args): ...
    async def wait(self, fail_fast: bool = True, timeout: float = None) -> bool: ...

class SessionPool:
    async def acquire(self, timeout: float = None) -> Session: ...
    async def retry_operation(self, callee: callable, *args, **kwargs): ...

class Session:
    async def execute_query(self, query: str, parameters: dict = None, **kwargs): ...
    async def create_table(self, path: str, table_description: TableDescription, **kwargs): ...

Async Operations

Schema Operations

Database schema management including directory operations, table schema inspection, permissions management, and metadata operations.

class SchemeClient:
    def __init__(self, driver: Driver): ...
    def make_directory(self, path: str, **kwargs): ...
    def remove_directory(self, path: str, **kwargs): ...
    def list_directory(self, path: str, **kwargs): ...
    def describe_path(self, path: str, **kwargs): ...

class SchemeEntry:
    name: str
    type: SchemeEntryType
    is_directory: bool
    is_table: bool

class TableDescription:
    def with_column(self, column: TableColumn) -> 'TableDescription': ...
    def with_primary_key(self, *key_names: str) -> 'TableDescription': ...

Schema Operations

Topic Operations

Streaming data operations including topic creation, message publishing, message consuming, and topic administration.

class TopicClient:
    def __init__(self, driver: Driver): ...

class TopicWriter:
    def __init__(self, driver: Driver, topic_path: str, **kwargs): ...
    def write(self, messages: List[ProducerMessage]): ...
    def flush(self): ...

class TopicReader:
    def __init__(self, driver: Driver, **kwargs): ...
    def receive_message(self) -> Message: ...
    def commit(self, message: Message): ...

class ProducerMessage:
    def __init__(self, data: bytes, **kwargs): ...

Topic Operations

Error Handling and Retries

Comprehensive error handling with detailed error hierarchies, retry strategies, backoff configurations, and operation result processing.

class Error(Exception):
    status: int
    message: str
    issues: List[Issue]

class RetryableError(Error): ...
class BadRequestError(Error): ...
class UnauthorizedError(Error): ...
class NotFoundError(Error): ...
class AlreadyExistsError(Error): ...

class RetrySettings:
    def __init__(self, max_retries: int = 10, **kwargs): ...

class BackoffSettings:
    def __init__(self, max_backoff: float = 32.0, **kwargs): ...

def retry_operation_sync(callee: callable, retry_settings: RetrySettings = None, *args, **kwargs): ...

Error Handling and Retries

SQLAlchemy Integration

SQLAlchemy dialect for YDB enabling ORM usage, custom YDB types, connection management, and standard SQLAlchemy operations.

import ydb.sqlalchemy

def register_dialect(): ...

class UInt32(Integer): ...
class UInt64(Integer): ...
class UInt8(Integer): ...

# Connection string format
# ydb://endpoint/database?param=value

SQLAlchemy Integration

DB-API 2.0 Interface

Standard Python DB-API 2.0 compliant interface providing familiar database connectivity patterns.

import ydb.dbapi

def connect(endpoint: str, database: str, **kwargs) -> Connection: ...

class Connection:
    def cursor(self) -> Cursor: ...
    def commit(self): ...
    def rollback(self): ...
    def close(self): ...

class Cursor:
    def execute(self, query: str, parameters: tuple = None): ...
    def executemany(self, query: str, seq_of_parameters: List[tuple]): ...
    def fetchone(self) -> tuple: ...
    def fetchall(self) -> List[tuple]: ...
    def fetchmany(self, size: int = None) -> List[tuple]: ...

DB-API 2.0 Interface

Scripting Operations

YQL script execution and management for running complex analytical queries and operations outside of table service.

class ScriptingClient:
    def __init__(self, driver: Driver): ...
    def execute_yql_script(self, yql_script: str, **kwargs): ...
    def explain_yql_script(self, yql_script: str, **kwargs): ...

class YqlQueryResult:
    result_sets: List[ResultSet]
    status: int

class YqlExplainResult:
    plan: str
    ast: str

Operation Management

Management of long-running operations including export/import operations and other asynchronous tasks.

class Operation:
    def __init__(self, operation_id: str, driver: Driver): ...
    def cancel(self): ...
    def forget(self): ...
    def get(self): ...

class OperationClient:
    def __init__(self, driver: Driver): ...
    def cancel_operation(self, operation: Operation): ...
    def forget_operation(self, operation: Operation): ...
    def get_operation(self, operation: Operation): ...

Export and Import Operations

Data export to external systems (S3, Yandex Object Storage) and import from external sources.

class ExportToS3Settings:
    def __init__(self, endpoint: str, bucket: str, **kwargs): ...

class ExportToS3Operation(Operation):
    def __init__(self, driver: Driver, source_path: str, settings: ExportToS3Settings): ...

class ImportFromS3Settings:
    def __init__(self, endpoint: str, bucket: str, **kwargs): ...

class ImportFromS3Operation(Operation):
    def __init__(self, driver: Driver, destination_path: str, settings: ImportFromS3Settings): ...

class ExportProgress:
    UNSPECIFIED: int
    PREPARING: int
    TRANSFER_DATA: int
    DONE: int
    CANCELLATION: int
    CANCELLED: int

class ImportProgress:
    UNSPECIFIED: int
    PREPARING: int
    TRANSFER_DATA: int
    BUILD_INDEXES: int
    DONE: int
    CANCELLATION: int
    CANCELLED: int

Retry Configuration

Configurable retry logic and backoff strategies for handling transient failures.

class RetrySettings:
    def __init__(self, max_retries: int = 10, **kwargs): ...

class BackoffSettings:
    def __init__(self, max_backoff: float = 32.0, **kwargs): ...

def retry_operation_sync(callee: callable, retry_settings: RetrySettings = None, *args, **kwargs): ...
def retry_operation_async(callee: callable, retry_settings: RetrySettings = None, *args, **kwargs): ...

Tracing Support

Distributed tracing integration for monitoring and debugging database operations.

class Tracer:
    def __init__(self, **kwargs): ...

class TraceLevel:
    OFF: int
    ERROR: int
    WARN: int
    INFO: int
    DEBUG: int
    TRACE: int

Global Settings

Global configuration options affecting SDK behavior across all operations.

def global_allow_truncated_result(enabled: bool): ...
def global_allow_split_transactions(enabled: bool): ...

Common Types

# Transaction modes
class TxMode:
    SERIALIZABLE_RW: TxMode
    ONLINE_RO: TxMode
    STALE_RO: TxMode
    SNAPSHOT_RO: TxMode

# Time units
class Unit:
    SECONDS: Unit
    MILLISECONDS: Unit
    MICROSECONDS: Unit
    NANOSECONDS: Unit

# Schema entry types  
class SchemeEntryType:
    DIRECTORY: SchemeEntryType
    TABLE: SchemeEntryType
    TOPIC: SchemeEntryType
    DATABASE: SchemeEntryType

# RPC compression options
class RPCCompression:
    NoCompression: int
    Deflate: int
    Gzip: int