CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ydb

Officially supported Python client for YDB distributed SQL database

Overview
Eval results
Files

schema-operations.mddocs/

Schema Operations

Database schema management including directory operations, table schema inspection, permissions management, and metadata operations.

Capabilities

Schema Client

The schema client provides operations for managing database structure, directories, and metadata.

class SchemeClient:
    def __init__(self, driver: Driver):
        """
        Create schema operations client.
        
        Args:
            driver (Driver): YDB driver instance
        """

    def make_directory(
        self,
        path: str,
        settings: MakeDirectorySettings = None
    ):
        """
        Create directory in the database.
        
        Args:
            path (str): Directory path to create
            settings (MakeDirectorySettings, optional): Directory creation settings
        """

    def remove_directory(
        self,
        path: str,
        settings: RemoveDirectorySettings = None
    ):
        """
        Remove directory from the database.
        
        Args:
            path (str): Directory path to remove
            settings (RemoveDirectorySettings, optional): Directory removal settings
        """

    def list_directory(
        self,
        path: str,
        settings: ListDirectorySettings = None
    ) -> Directory:
        """
        List contents of a directory.
        
        Args:
            path (str): Directory path to list
            settings (ListDirectorySettings, optional): Listing settings
            
        Returns:
            Directory: Directory information with child entries
        """

    def describe_path(
        self,
        path: str,
        settings: DescribePathSettings = None
    ) -> SchemeEntry:
        """
        Get detailed information about a path entry.
        
        Args:
            path (str): Path to describe
            settings (DescribePathSettings, optional): Description settings
            
        Returns:
            SchemeEntry: Path entry information
        """

    def modify_permissions(
        self,
        path: str,
        permissions: Permissions,
        settings: ModifyPermissionsSettings = None
    ):
        """
        Modify access permissions for a path.
        
        Args:
            path (str): Path to modify permissions for
            permissions (Permissions): Permission modifications
            settings (ModifyPermissionsSettings, optional): Permission settings
        """

class MakeDirectorySettings(BaseRequestSettings):
    def __init__(
        self,
        request_type: str = None,
        trace_id: str = None,
        timeout: float = None
    ):
        """
        Settings for directory creation operations.
        
        Args:
            request_type (str, optional): Request type for logging
            trace_id (str, optional): Request tracing identifier
            timeout (float, optional): Operation timeout
        """

class RemoveDirectorySettings(BaseRequestSettings):
    def __init__(
        self,
        request_type: str = None,
        trace_id: str = None,
        timeout: float = None
    ):
        """
        Settings for directory removal operations.
        
        Args:
            request_type (str, optional): Request type for logging
            trace_id (str, optional): Request tracing identifier
            timeout (float, optional): Operation timeout
        """

class ListDirectorySettings(BaseRequestSettings):
    def __init__(
        self,
        request_type: str = None,
        trace_id: str = None,
        timeout: float = None
    ):
        """
        Settings for directory listing operations.
        
        Args:
            request_type (str, optional): Request type for logging
            trace_id (str, optional): Request tracing identifier
            timeout (float, optional): Operation timeout
        """

class DescribePathSettings(BaseRequestSettings):
    def __init__(
        self,
        request_type: str = None,
        trace_id: str = None,
        timeout: float = None
    ):
        """
        Settings for path description operations.
        
        Args:
            request_type (str, optional): Request type for logging
            trace_id (str, optional): Request tracing identifier
            timeout (float, optional): Operation timeout
        """

Schema Entry Types

Enumeration of different entry types in the database schema.

class SchemeEntryType(enum.IntEnum):
    """
    Database entry types enumeration.
    """
    TYPE_UNSPECIFIED = 0
    DIRECTORY = 1
    TABLE = 2
    PERS_QUEUE_GROUP = 3
    DATABASE = 4
    RTMR_VOLUME = 5
    BLOCK_STORE_VOLUME = 6
    COORDINATION_NODE = 7
    COLUMN_STORE = 12
    COLUMN_TABLE = 13
    SEQUENCE = 15
    REPLICATION = 16
    TOPIC = 17
    EXTERNAL_TABLE = 18
    EXTERNAL_DATA_SOURCE = 19
    VIEW = 20
    RESOURCE_POOL = 21

    @staticmethod
    def is_table(entry: 'SchemeEntryType') -> bool:
        """
        Check if entry is a row table.
        
        Args:
            entry (SchemeEntryType): Entry type to check
            
        Returns:
            bool: True if entry is a row table
        """

    @staticmethod
    def is_any_table(entry: 'SchemeEntryType') -> bool:
        """
        Check if entry is any type of table.
        
        Args:
            entry (SchemeEntryType): Entry type to check
            
        Returns:
            bool: True if entry is any table type
        """

    @staticmethod
    def is_column_table(entry: 'SchemeEntryType') -> bool:
        """
        Check if entry is a column table.
        
        Args:
            entry (SchemeEntryType): Entry type to check
            
        Returns:
            bool: True if entry is a column table
        """

    @staticmethod
    def is_directory(entry: 'SchemeEntryType') -> bool:
        """
        Check if entry is a directory.
        
        Args:
            entry (SchemeEntryType): Entry type to check
            
        Returns:
            bool: True if entry is a directory
        """

    @staticmethod
    def is_topic(entry: 'SchemeEntryType') -> bool:
        """
        Check if entry is a topic.
        
        Args:
            entry (SchemeEntryType): Entry type to check
            
        Returns:
            bool: True if entry is a topic
        """

Schema Entries

Data structures representing database schema entries and their metadata.

class SchemeEntry:
    def __init__(
        self,
        name: str,
        owner: str,
        type: SchemeEntryType,
        effective_permissions: List[str] = None,
        permissions: List[Permissions] = None,
        size_bytes: int = None
    ):
        """
        Schema entry information.
        
        Args:
            name (str): Entry name
            owner (str): Entry owner
            type (SchemeEntryType): Entry type
            effective_permissions (List[str], optional): Effective permissions
            permissions (List[Permissions], optional): Permission settings
            size_bytes (int, optional): Entry size in bytes
        """

    @property
    def name(self) -> str:
        """Entry name."""

    @property
    def owner(self) -> str:
        """Entry owner."""

    @property
    def type(self) -> SchemeEntryType:
        """Entry type."""

    @property
    def effective_permissions(self) -> List[str]:
        """Effective permissions for the entry."""

    @property
    def permissions(self) -> List[Permissions]:
        """Permission settings for the entry."""

    @property
    def size_bytes(self) -> int:
        """Entry size in bytes."""

    @property
    def is_directory(self) -> bool:
        """True if entry is a directory."""

    @property
    def is_table(self) -> bool:
        """True if entry is a table."""

    @property
    def is_topic(self) -> bool:
        """True if entry is a topic."""

class Directory:
    def __init__(
        self,
        path: str,
        children: List[SchemeEntry] = None,
        self_entry: SchemeEntry = None
    ):
        """
        Directory information with child entries.
        
        Args:
            path (str): Directory path
            children (List[SchemeEntry], optional): Child entries
            self_entry (SchemeEntry, optional): Directory's own entry info
        """

    @property
    def path(self) -> str:
        """Directory path."""

    @property
    def children(self) -> List[SchemeEntry]:
        """Child entries in the directory."""

    @property
    def self_entry(self) -> SchemeEntry:
        """Directory's own schema entry."""

    def get_child(self, name: str) -> Optional[SchemeEntry]:
        """
        Get child entry by name.
        
        Args:
            name (str): Child entry name
            
        Returns:
            Optional[SchemeEntry]: Child entry or None if not found
        """

    def has_child(self, name: str) -> bool:
        """
        Check if directory has a child with given name.
        
        Args:
            name (str): Child entry name
            
        Returns:
            bool: True if child exists
        """

    def list_tables(self) -> List[SchemeEntry]:
        """
        Get all table entries in the directory.
        
        Returns:
            List[SchemeEntry]: Table entries
        """

    def list_directories(self) -> List[SchemeEntry]:
        """
        Get all subdirectory entries.
        
        Returns:
            List[SchemeEntry]: Directory entries
        """

    def list_topics(self) -> List[SchemeEntry]:
        """
        Get all topic entries in the directory.
        
        Returns:
            List[SchemeEntry]: Topic entries
        """

Permissions Management

Access control and permission management for database objects.

class Permissions:
    def __init__(
        self,
        subject: str = None,
        permission_names: List[str] = None
    ):
        """
        Permission settings for database objects.
        
        Args:
            subject (str, optional): Permission subject (user or group)
            permission_names (List[str], optional): List of permission names
        """

    @property
    def subject(self) -> str:
        """Permission subject."""

    @property
    def permission_names(self) -> List[str]:
        """List of permission names."""

    def add_permission(self, permission_name: str) -> 'Permissions':
        """
        Add permission to the list.
        
        Args:
            permission_name (str): Permission name to add
            
        Returns:
            Permissions: Self for method chaining
        """

    def remove_permission(self, permission_name: str) -> 'Permissions':
        """
        Remove permission from the list.
        
        Args:
            permission_name (str): Permission name to remove
            
        Returns:
            Permissions: Self for method chaining
        """

    def has_permission(self, permission_name: str) -> bool:
        """
        Check if permission is granted.
        
        Args:
            permission_name (str): Permission name to check
            
        Returns:
            bool: True if permission is granted
        """

class ModifyPermissionsSettings(BaseRequestSettings):
    def __init__(
        self,
        actions: List[PermissionAction] = None,
        request_type: str = None,
        trace_id: str = None,
        timeout: float = None
    ):
        """
        Settings for permission modification operations.
        
        Args:
            actions (List[PermissionAction], optional): Permission actions to perform
            request_type (str, optional): Request type for logging
            trace_id (str, optional): Request tracing identifier
            timeout (float, optional): Operation timeout
        """

class PermissionAction:
    def __init__(
        self,
        action_type: PermissionActionType,
        subject: str,
        permission_names: List[str]
    ):
        """
        Permission modification action.
        
        Args:
            action_type (PermissionActionType): Type of action (grant/revoke)
            subject (str): Permission subject
            permission_names (List[str]): Permissions to modify
        """

    @property
    def action_type(self) -> PermissionActionType:
        """Type of permission action."""

    @property
    def subject(self) -> str:
        """Permission subject."""

    @property
    def permission_names(self) -> List[str]:
        """Permissions being modified."""

class PermissionActionType(enum.Enum):
    """Permission action types."""
    GRANT = "grant"
    REVOKE = "revoke"
    SET = "set"

Table Schema Information

Detailed table structure information retrieved through schema operations.

class TableDescription:
    def __init__(
        self,
        name: str = None,
        columns: List[TableColumn] = None,
        primary_key: List[str] = None,
        indexes: List[TableIndex] = None,
        ttl_settings: TtlSettings = None,
        storage_settings: StorageSettings = None,
        column_families: List[ColumnFamily] = None,
        attributes: Dict[str, str] = None,
        compaction_policy: CompactionPolicy = None,
        partitioning_settings: PartitioningSettings = None,
        key_bloom_filter: bool = None,
        read_replicas_settings: ReadReplicasSettings = None
    ):
        """
        Complete table description with structure and settings.
        
        Args:
            name (str, optional): Table name
            columns (List[TableColumn], optional): Table columns
            primary_key (List[str], optional): Primary key column names
            indexes (List[TableIndex], optional): Secondary indexes
            ttl_settings (TtlSettings, optional): Time-to-live settings
            storage_settings (StorageSettings, optional): Storage configuration
            column_families (List[ColumnFamily], optional): Column families
            attributes (Dict[str, str], optional): Table attributes
            compaction_policy (CompactionPolicy, optional): Compaction settings
            partitioning_settings (PartitioningSettings, optional): Partitioning configuration
            key_bloom_filter (bool, optional): Enable key bloom filter
            read_replicas_settings (ReadReplicasSettings, optional): Read replica settings
        """

    def with_column(self, column: TableColumn) -> 'TableDescription':
        """
        Add column to table description.
        
        Args:
            column (TableColumn): Column to add
            
        Returns:
            TableDescription: Self for method chaining
        """

    def with_primary_key(self, *key_names: str) -> 'TableDescription':
        """
        Set primary key columns.
        
        Args:
            *key_names (str): Primary key column names
            
        Returns:
            TableDescription: Self for method chaining
        """

    def with_index(self, index: TableIndex) -> 'TableDescription':
        """
        Add secondary index to table description.
        
        Args:
            index (TableIndex): Index to add
            
        Returns:
            TableDescription: Self for method chaining
        """

    def with_ttl(self, ttl_settings: TtlSettings) -> 'TableDescription':
        """
        Set time-to-live settings.
        
        Args:
            ttl_settings (TtlSettings): TTL configuration
            
        Returns:
            TableDescription: Self for method chaining
        """

    @property
    def columns(self) -> List[TableColumn]:
        """Table columns."""

    @property
    def primary_key(self) -> List[str]:
        """Primary key column names."""

    @property
    def indexes(self) -> List[TableIndex]:
        """Secondary indexes."""

    def get_column(self, name: str) -> Optional[TableColumn]:
        """
        Get column by name.
        
        Args:
            name (str): Column name
            
        Returns:
            Optional[TableColumn]: Column or None if not found
        """

    def has_column(self, name: str) -> bool:
        """
        Check if table has column with given name.
        
        Args:
            name (str): Column name
            
        Returns:
            bool: True if column exists
        """

class TableColumn:
    def __init__(
        self,
        name: str,
        type: Type,
        family: str = None,
        not_null: bool = False
    ):
        """
        Table column description.
        
        Args:
            name (str): Column name
            type (Type): Column YDB type
            family (str, optional): Column family name
            not_null (bool): Whether column is not nullable
        """

    @property
    def name(self) -> str:
        """Column name."""

    @property
    def type(self) -> Type:
        """Column YDB type."""

    @property
    def family(self) -> str:
        """Column family name."""

    @property
    def not_null(self) -> bool:
        """Whether column is not nullable."""

class TableIndex:
    def __init__(
        self,
        name: str,
        index_columns: List[str],
        data_columns: List[str] = None,
        global_index: bool = True,
        unique: bool = False
    ):
        """
        Table index description.
        
        Args:
            name (str): Index name
            index_columns (List[str]): Indexed column names
            data_columns (List[str], optional): Additional data columns
            global_index (bool): Whether index is global
            unique (bool): Whether index enforces uniqueness
        """

    @property
    def name(self) -> str:
        """Index name."""

    @property
    def index_columns(self) -> List[str]:
        """Indexed column names."""

    @property
    def data_columns(self) -> List[str]:
        """Additional data columns."""

    @property
    def global_index(self) -> bool:
        """Whether index is global."""

    @property
    def unique(self) -> bool:
        """Whether index enforces uniqueness."""

Schema Utility Functions

Convenience functions for common schema operations.

def make_directory(
    driver: Driver,
    path: str,
    settings: MakeDirectorySettings = None
):
    """
    Create directory using driver.
    
    Args:
        driver (Driver): YDB driver instance
        path (str): Directory path to create
        settings (MakeDirectorySettings, optional): Creation settings
    """

def remove_directory(
    driver: Driver,
    path: str,
    settings: RemoveDirectorySettings = None
):
    """
    Remove directory using driver.
    
    Args:
        driver (Driver): YDB driver instance
        path (str): Directory path to remove
        settings (RemoveDirectorySettings, optional): Removal settings
    """

def list_directory(
    driver: Driver,
    path: str,
    settings: ListDirectorySettings = None
) -> Directory:
    """
    List directory contents using driver.
    
    Args:
        driver (Driver): YDB driver instance
        path (str): Directory path to list
        settings (ListDirectorySettings, optional): Listing settings
        
    Returns:
        Directory: Directory information
    """

def describe_path(
    driver: Driver,
    path: str,
    settings: DescribePathSettings = None
) -> SchemeEntry:
    """
    Describe path entry using driver.
    
    Args:
        driver (Driver): YDB driver instance
        path (str): Path to describe
        settings (DescribePathSettings, optional): Description settings
        
    Returns:
        SchemeEntry: Path entry information
    """

Usage Examples

Directory Operations

import ydb

# Create driver and scheme client
driver = ydb.Driver(endpoint="grpc://localhost:2136", database="/local")
driver.wait(fail_fast=True)

scheme_client = ydb.SchemeClient(driver)

# Create directory hierarchy
scheme_client.make_directory("/local/app")
scheme_client.make_directory("/local/app/tables")
scheme_client.make_directory("/local/app/indexes")

# List directory contents
directory = scheme_client.list_directory("/local/app")
print(f"Directory: {directory.path}")

for entry in directory.children:
    print(f"  {entry.name} ({entry.type})")
    if entry.is_directory:
        print(f"    Directory owned by: {entry.owner}")
    elif entry.is_table:
        print(f"    Table size: {entry.size_bytes} bytes")

Schema Inspection

# Inspect table schema
def inspect_table_schema(driver, table_path):
    scheme_client = ydb.SchemeClient(driver)
    
    # Get table entry information
    entry = scheme_client.describe_path(table_path)
    print(f"Table: {entry.name}")
    print(f"Owner: {entry.owner}")
    print(f"Size: {entry.size_bytes} bytes")
    
    # Get detailed table description through session
    session_pool = ydb.SessionPool(driver)
    
    def get_table_description(session):
        return session.describe_table(table_path)
    
    table_desc = session_pool.retry_operation_sync(get_table_description)
    
    print("\nColumns:")
    for column in table_desc.columns:
        nullable = "NULL" if not column.not_null else "NOT NULL"
        print(f"  {column.name}: {column.type} {nullable}")
    
    print(f"\nPrimary Key: {', '.join(table_desc.primary_key)}")
    
    if table_desc.indexes:
        print("\nIndexes:")
        for index in table_desc.indexes:
            index_type = "UNIQUE" if index.unique else "NON-UNIQUE"
            print(f"  {index.name}: {index_type} on {', '.join(index.index_columns)}")

inspect_table_schema(driver, "/local/users")

Directory Tree Traversal

def traverse_directory_tree(scheme_client, path, level=0):
    """Recursively traverse and display directory tree."""
    indent = "  " * level
    
    try:
        directory = scheme_client.list_directory(path)
        print(f"{indent}{directory.self_entry.name}/")
        
        # Process child entries
        for entry in directory.children:
            if entry.is_directory:
                # Recursively traverse subdirectories
                child_path = f"{path}/{entry.name}".replace("//", "/")
                traverse_directory_tree(scheme_client, child_path, level + 1)
            else:
                # Display non-directory entries
                entry_type = "table" if entry.is_table else "topic" if entry.is_topic else str(entry.type)
                print(f"{indent}  {entry.name} ({entry_type})")
                
    except ydb.NotFoundError:
        print(f"{indent}[Directory not found: {path}]")

# Usage
traverse_directory_tree(scheme_client, "/local")

Permission Management

# Modify permissions for a table
def grant_table_permissions(scheme_client, table_path, user, permissions):
    """Grant permissions to user for a table."""
    
    permission_actions = [
        ydb.PermissionAction(
            action_type=ydb.PermissionActionType.GRANT,
            subject=user,
            permission_names=permissions
        )
    ]
    
    settings = ydb.ModifyPermissionsSettings(actions=permission_actions)
    
    scheme_client.modify_permissions(table_path, settings)
    print(f"Granted permissions {permissions} to {user} for {table_path}")

# Grant read/write permissions
grant_table_permissions(
    scheme_client,
    "/local/users",
    "user@domain.com",
    ["ydb.generic.read", "ydb.generic.write"]
)

# Check current permissions
entry = scheme_client.describe_path("/local/users")
print("Current permissions:")
for perm in entry.permissions:
    print(f"  {perm.subject}: {perm.permission_names}")

Schema Comparison

def compare_table_schemas(scheme_client, session_pool, table1_path, table2_path):
    """Compare schemas of two tables."""
    
    def get_table_info(session, table_path):
        desc = session.describe_table(table_path)
        return {
            "columns": {col.name: col.type for col in desc.columns},
            "primary_key": desc.primary_key,
            "indexes": {idx.name: idx.index_columns for idx in desc.indexes}
        }
    
    # Get both table descriptions
    table1_info = session_pool.retry_operation_sync(get_table_info, table1_path)
    table2_info = session_pool.retry_operation_sync(get_table_info, table2_path)
    
    # Compare columns
    print("Column differences:")
    all_columns = set(table1_info["columns"].keys()) | set(table2_info["columns"].keys())
    
    for col_name in sorted(all_columns):
        type1 = table1_info["columns"].get(col_name, "MISSING")
        type2 = table2_info["columns"].get(col_name, "MISSING")
        
        if type1 != type2:
            print(f"  {col_name}: {type1} vs {type2}")
    
    # Compare primary keys
    if table1_info["primary_key"] != table2_info["primary_key"]:
        print(f"Primary key difference:")
        print(f"  Table 1: {table1_info['primary_key']}")
        print(f"  Table 2: {table2_info['primary_key']}")
    
    # Compare indexes
    if table1_info["indexes"] != table2_info["indexes"]:
        print("Index differences:")
        all_indexes = set(table1_info["indexes"].keys()) | set(table2_info["indexes"].keys())
        
        for idx_name in sorted(all_indexes):
            idx1 = table1_info["indexes"].get(idx_name, "MISSING")
            idx2 = table2_info["indexes"].get(idx_name, "MISSING")
            
            if idx1 != idx2:
                print(f"  {idx_name}: {idx1} vs {idx2}")

# Usage
compare_table_schemas(scheme_client, session_pool, "/local/users", "/local/users_backup")

Type Definitions

# Common type aliases
SchemaPath = str
EntryName = str
PermissionName = str
Subject = str

# Permission collections
ReadPermissions = ["ydb.generic.read"]
WritePermissions = ["ydb.generic.write"]
AdminPermissions = ["ydb.generic.read", "ydb.generic.write", "ydb.generic.manage"]

# Entry type predicates
IsDirectory = Callable[[SchemeEntry], bool]
IsTable = Callable[[SchemeEntry], bool]
IsAnyTable = Callable[[SchemeEntry], bool]

Install with Tessl CLI

npx tessl i tessl/pypi-ydb

docs

async-operations.md

authentication.md

data-types.md

dbapi-interface.md

driver-connection.md

error-handling.md

index.md

query-service.md

schema-operations.md

sqlalchemy-integration.md

table-operations.md

topic-operations.md

tile.json