CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ydb

Officially supported Python client for YDB distributed SQL database

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

index.mddocs/

YDB Python SDK

The official Python client for YDB (Yandex Database), a distributed SQL database. The SDK provides comprehensive functionality for table operations, query execution, transaction management, authentication, async operations, topic operations for streaming data, and integrations with SQLAlchemy and DB-API 2.0.

Package Information

  • Package Name: ydb
  • Language: Python
  • Installation: pip install ydb
  • Repository: https://github.com/ydb-platform/ydb-python-sdk
  • Documentation: https://ydb-platform.github.io/ydb-python-sdk

Core Imports

import ydb

Common patterns for specific functionality:

# Core driver and connection
from ydb import Driver, DriverConfig

# Table operations and sessions
from ydb import SessionPool, Session, TxContext

# Data types
from ydb import types

# Async operations
import ydb.aio as ydb_aio

# Schema operations
from ydb import SchemeClient

# Query service
from ydb import QueryService

# Topic operations
from ydb import TopicClient

# Authentication
from ydb import AnonymousCredentials, StaticCredentials
import ydb.iam

# SQLAlchemy integration
import ydb.sqlalchemy

# DB-API 2.0 interface
import ydb.dbapi

Basic Usage

Simple Connection and Query

import ydb

# Create driver with connection string
driver = ydb.Driver(
    endpoint="grpc://localhost:2136",
    database="/local",
    credentials=ydb.AnonymousCredentials()
)

# Wait for driver to be ready
driver.wait(fail_fast=True, timeout=5)

# Create session pool
session_pool = ydb.SessionPool(driver)

def execute_query(session):
    # Execute YQL query
    result_sets = session.transaction(ydb.SerializableReadWrite()).execute(
        "SELECT 1 AS value;",
        commit_tx=True
    )
    
    for result_set in result_sets:
        for row in result_set.rows:
            print(f"Value: {row.value}")

# Execute query using session pool
session_pool.retry_operation_sync(execute_query)

# Clean up
session_pool.stop()
driver.stop()

Table Operations

import ydb

# Create table
def create_table(session):
    session.create_table(
        '/local/test_table',
        ydb.TableDescription()
        .with_column(ydb.TableColumn('id', ydb.OptionalType(ydb.PrimitiveType.Uint64)))
        .with_column(ydb.TableColumn('name', ydb.OptionalType(ydb.PrimitiveType.Utf8)))
        .with_primary_key('id')
    )

session_pool.retry_operation_sync(create_table)

Architecture

The YDB Python SDK is organized around several key components:

  • Driver: Manages connections to YDB cluster and handles endpoint discovery
  • Session Pool: Manages database sessions with automatic retry and failover
  • Query Execution: Table service for traditional operations and Query service for modern SQL
  • Type System: Complete type mapping between Python and YDB data types
  • Authentication: Multiple credential providers including OAuth2, JWT, and service accounts
  • Async Support: Full async/await interface through ydb.aio module
  • Integrations: SQLAlchemy dialect and DB-API 2.0 compliant interface

Capabilities

Driver and Connection Management

Core connection functionality including driver configuration, endpoint discovery, credential management, and connection lifecycle.

class Driver:
    def __init__(self, endpoint: str, database: str, credentials: Credentials = None, **kwargs): ...
    def wait(self, fail_fast: bool = True, timeout: float = None) -> bool: ...
    def stop(self, timeout: float = None): ...

class DriverConfig:
    def __init__(self, endpoint: str, database: str, credentials: Credentials = None, **kwargs): ...

def construct_driver(config: DriverConfig) -> Driver: ...
def construct_driver_from_string(connection_string: str, **kwargs) -> Driver: ...

Driver and Connection Management

Authentication and Credentials

Complete authentication system supporting anonymous access, static tokens, OAuth2 token exchange, JWT authentication, service account credentials, and Yandex Cloud IAM integration.

class AnonymousCredentials(Credentials): ...

class StaticCredentials(Credentials):
    def __init__(self, token: str): ...

class OAuth2TokenExchangeCredentials(Credentials):
    def __init__(self, token_endpoint: str, **kwargs): ...

def credentials_from_env_variables() -> Credentials: ...
def default_credentials(credentials: Credentials = None) -> Credentials: ...

Authentication and Credentials

Table Operations and Sessions

Comprehensive table operations including session management, transaction handling, query execution, table creation/modification, bulk operations, and point reads.

class SessionPool:
    def __init__(self, driver: Driver, size: int = None): ...
    def retry_operation_sync(self, callee: callable, *args, **kwargs): ...
    def acquire(self, timeout: float = None) -> Session: ...

class Session:
    def execute_query(self, query: str, parameters: dict = None, **kwargs): ...
    def create_table(self, path: str, table_description: TableDescription, **kwargs): ...
    def transaction(self, tx_mode: TxMode = None) -> TxContext: ...

class TxContext:
    def execute(self, query: str, parameters: dict = None, commit_tx: bool = False): ...
    def commit(self): ...
    def rollback(self): ...

Table Operations and Sessions

Data Types and Conversion

Complete type system mapping Python types to YDB types, including primitives, optionals, containers, and advanced types like decimals and variants.

class PrimitiveType:
    Bool: Type
    Int8: Type
    Uint8: Type
    Int16: Type
    Uint16: Type
    Int32: Type
    Uint32: Type
    Int64: Type
    Uint64: Type
    Float: Type
    Double: Type
    String: Type
    Utf8: Type
    Yson: Type
    Json: Type
    Uuid: Type
    Date: Type
    Datetime: Type
    Timestamp: Type
    Interval: Type

def Optional(item_type: Type) -> OptionalType: ...
def List(item_type: Type) -> ListType: ...
def Tuple(*item_types: Type) -> TupleType: ...
def Struct(**kwargs: Type) -> StructType: ...
def Dict(key_type: Type, value_type: Type) -> DictType: ...

Data Types and Conversion

Query Service Operations

Modern query interface supporting session management, transaction execution, result streaming, and advanced query options with the new Query service.

class QueryService:
    def __init__(self, driver: Driver): ...

class QuerySession:
    def execute_query(self, query: str, **kwargs): ...
    def transaction(self, tx_settings: TxSettings = None) -> QueryTransaction: ...

class QueryTransaction:
    def execute(self, query: str, parameters: dict = None): ...
    def commit(self): ...
    def rollback(self): ...

Query Service Operations

Async Operations

Full async/await interface providing asynchronous versions of all core functionality through the ydb.aio module.

import ydb.aio as ydb_aio

class Driver:
    async def __aenter__(self) -> 'Driver': ...
    async def __aexit__(self, *args): ...
    async def wait(self, fail_fast: bool = True, timeout: float = None) -> bool: ...

class SessionPool:
    async def acquire(self, timeout: float = None) -> Session: ...
    async def retry_operation(self, callee: callable, *args, **kwargs): ...

class Session:
    async def execute_query(self, query: str, parameters: dict = None, **kwargs): ...
    async def create_table(self, path: str, table_description: TableDescription, **kwargs): ...

Async Operations

Schema Operations

Database schema management including directory operations, table schema inspection, permissions management, and metadata operations.

class SchemeClient:
    def __init__(self, driver: Driver): ...
    def make_directory(self, path: str, **kwargs): ...
    def remove_directory(self, path: str, **kwargs): ...
    def list_directory(self, path: str, **kwargs): ...
    def describe_path(self, path: str, **kwargs): ...

class SchemeEntry:
    name: str
    type: SchemeEntryType
    is_directory: bool
    is_table: bool

class TableDescription:
    def with_column(self, column: TableColumn) -> 'TableDescription': ...
    def with_primary_key(self, *key_names: str) -> 'TableDescription': ...

Schema Operations

Topic Operations

Streaming data operations including topic creation, message publishing, message consuming, and topic administration.

class TopicClient:
    def __init__(self, driver: Driver): ...

class TopicWriter:
    def __init__(self, driver: Driver, topic_path: str, **kwargs): ...
    def write(self, messages: List[ProducerMessage]): ...
    def flush(self): ...

class TopicReader:
    def __init__(self, driver: Driver, **kwargs): ...
    def receive_message(self) -> Message: ...
    def commit(self, message: Message): ...

class ProducerMessage:
    def __init__(self, data: bytes, **kwargs): ...

Topic Operations

Error Handling and Retries

Comprehensive error handling with detailed error hierarchies, retry strategies, backoff configurations, and operation result processing.

class Error(Exception):
    status: int
    message: str
    issues: List[Issue]

class RetryableError(Error): ...
class BadRequestError(Error): ...
class UnauthorizedError(Error): ...
class NotFoundError(Error): ...
class AlreadyExistsError(Error): ...

class RetrySettings:
    def __init__(self, max_retries: int = 10, **kwargs): ...

class BackoffSettings:
    def __init__(self, max_backoff: float = 32.0, **kwargs): ...

def retry_operation_sync(callee: callable, retry_settings: RetrySettings = None, *args, **kwargs): ...

Error Handling and Retries

SQLAlchemy Integration

SQLAlchemy dialect for YDB enabling ORM usage, custom YDB types, connection management, and standard SQLAlchemy operations.

import ydb.sqlalchemy

def register_dialect(): ...

class UInt32(Integer): ...
class UInt64(Integer): ...
class UInt8(Integer): ...

# Connection string format
# ydb://endpoint/database?param=value

SQLAlchemy Integration

DB-API 2.0 Interface

Standard Python DB-API 2.0 compliant interface providing familiar database connectivity patterns.

import ydb.dbapi

def connect(endpoint: str, database: str, **kwargs) -> Connection: ...

class Connection:
    def cursor(self) -> Cursor: ...
    def commit(self): ...
    def rollback(self): ...
    def close(self): ...

class Cursor:
    def execute(self, query: str, parameters: tuple = None): ...
    def executemany(self, query: str, seq_of_parameters: List[tuple]): ...
    def fetchone(self) -> tuple: ...
    def fetchall(self) -> List[tuple]: ...
    def fetchmany(self, size: int = None) -> List[tuple]: ...

DB-API 2.0 Interface

Scripting Operations

YQL script execution and management for running complex analytical queries and operations outside of table service.

class ScriptingClient:
    def __init__(self, driver: Driver): ...
    def execute_yql_script(self, yql_script: str, **kwargs): ...
    def explain_yql_script(self, yql_script: str, **kwargs): ...

class YqlQueryResult:
    result_sets: List[ResultSet]
    status: int

class YqlExplainResult:
    plan: str
    ast: str

Operation Management

Management of long-running operations including export/import operations and other asynchronous tasks.

class Operation:
    def __init__(self, operation_id: str, driver: Driver): ...
    def cancel(self): ...
    def forget(self): ...
    def get(self): ...

class OperationClient:
    def __init__(self, driver: Driver): ...
    def cancel_operation(self, operation: Operation): ...
    def forget_operation(self, operation: Operation): ...
    def get_operation(self, operation: Operation): ...

Export and Import Operations

Data export to external systems (S3, Yandex Object Storage) and import from external sources.

class ExportToS3Settings:
    def __init__(self, endpoint: str, bucket: str, **kwargs): ...

class ExportToS3Operation(Operation):
    def __init__(self, driver: Driver, source_path: str, settings: ExportToS3Settings): ...

class ImportFromS3Settings:
    def __init__(self, endpoint: str, bucket: str, **kwargs): ...

class ImportFromS3Operation(Operation):
    def __init__(self, driver: Driver, destination_path: str, settings: ImportFromS3Settings): ...

class ExportProgress:
    UNSPECIFIED: int
    PREPARING: int
    TRANSFER_DATA: int
    DONE: int
    CANCELLATION: int
    CANCELLED: int

class ImportProgress:
    UNSPECIFIED: int
    PREPARING: int
    TRANSFER_DATA: int
    BUILD_INDEXES: int
    DONE: int
    CANCELLATION: int
    CANCELLED: int

Retry Configuration

Configurable retry logic and backoff strategies for handling transient failures.

class RetrySettings:
    def __init__(self, max_retries: int = 10, **kwargs): ...

class BackoffSettings:
    def __init__(self, max_backoff: float = 32.0, **kwargs): ...

def retry_operation_sync(callee: callable, retry_settings: RetrySettings = None, *args, **kwargs): ...
def retry_operation_async(callee: callable, retry_settings: RetrySettings = None, *args, **kwargs): ...

Tracing Support

Distributed tracing integration for monitoring and debugging database operations.

class Tracer:
    def __init__(self, **kwargs): ...

class TraceLevel:
    OFF: int
    ERROR: int
    WARN: int
    INFO: int
    DEBUG: int
    TRACE: int

Global Settings

Global configuration options affecting SDK behavior across all operations.

def global_allow_truncated_result(enabled: bool): ...
def global_allow_split_transactions(enabled: bool): ...

Common Types

# Transaction modes
class TxMode:
    SERIALIZABLE_RW: TxMode
    ONLINE_RO: TxMode
    STALE_RO: TxMode
    SNAPSHOT_RO: TxMode

# Time units
class Unit:
    SECONDS: Unit
    MILLISECONDS: Unit
    MICROSECONDS: Unit
    NANOSECONDS: Unit

# Schema entry types  
class SchemeEntryType:
    DIRECTORY: SchemeEntryType
    TABLE: SchemeEntryType
    TOPIC: SchemeEntryType
    DATABASE: SchemeEntryType

# RPC compression options
class RPCCompression:
    NoCompression: int
    Deflate: int
    Gzip: int

docs

async-operations.md

authentication.md

data-types.md

dbapi-interface.md

driver-connection.md

error-handling.md

index.md

query-service.md

schema-operations.md

sqlalchemy-integration.md

table-operations.md

topic-operations.md

tile.json