Officially supported Python client for YDB distributed SQL database
Database schema management including directory operations, table schema inspection, permissions management, and metadata operations.
The schema client provides operations for managing database structure, directories, and metadata.
class SchemeClient:
def __init__(self, driver: Driver):
"""
Create schema operations client.
Args:
driver (Driver): YDB driver instance
"""
def make_directory(
self,
path: str,
settings: MakeDirectorySettings = None
):
"""
Create directory in the database.
Args:
path (str): Directory path to create
settings (MakeDirectorySettings, optional): Directory creation settings
"""
def remove_directory(
self,
path: str,
settings: RemoveDirectorySettings = None
):
"""
Remove directory from the database.
Args:
path (str): Directory path to remove
settings (RemoveDirectorySettings, optional): Directory removal settings
"""
def list_directory(
self,
path: str,
settings: ListDirectorySettings = None
) -> Directory:
"""
List contents of a directory.
Args:
path (str): Directory path to list
settings (ListDirectorySettings, optional): Listing settings
Returns:
Directory: Directory information with child entries
"""
def describe_path(
self,
path: str,
settings: DescribePathSettings = None
) -> SchemeEntry:
"""
Get detailed information about a path entry.
Args:
path (str): Path to describe
settings (DescribePathSettings, optional): Description settings
Returns:
SchemeEntry: Path entry information
"""
def modify_permissions(
self,
path: str,
permissions: Permissions,
settings: ModifyPermissionsSettings = None
):
"""
Modify access permissions for a path.
Args:
path (str): Path to modify permissions for
permissions (Permissions): Permission modifications
settings (ModifyPermissionsSettings, optional): Permission settings
"""
class MakeDirectorySettings(BaseRequestSettings):
def __init__(
self,
request_type: str = None,
trace_id: str = None,
timeout: float = None
):
"""
Settings for directory creation operations.
Args:
request_type (str, optional): Request type for logging
trace_id (str, optional): Request tracing identifier
timeout (float, optional): Operation timeout
"""
class RemoveDirectorySettings(BaseRequestSettings):
def __init__(
self,
request_type: str = None,
trace_id: str = None,
timeout: float = None
):
"""
Settings for directory removal operations.
Args:
request_type (str, optional): Request type for logging
trace_id (str, optional): Request tracing identifier
timeout (float, optional): Operation timeout
"""
class ListDirectorySettings(BaseRequestSettings):
def __init__(
self,
request_type: str = None,
trace_id: str = None,
timeout: float = None
):
"""
Settings for directory listing operations.
Args:
request_type (str, optional): Request type for logging
trace_id (str, optional): Request tracing identifier
timeout (float, optional): Operation timeout
"""
class DescribePathSettings(BaseRequestSettings):
def __init__(
self,
request_type: str = None,
trace_id: str = None,
timeout: float = None
):
"""
Settings for path description operations.
Args:
request_type (str, optional): Request type for logging
trace_id (str, optional): Request tracing identifier
timeout (float, optional): Operation timeout
"""Enumeration of different entry types in the database schema.
class SchemeEntryType(enum.IntEnum):
"""
Database entry types enumeration.
"""
TYPE_UNSPECIFIED = 0
DIRECTORY = 1
TABLE = 2
PERS_QUEUE_GROUP = 3
DATABASE = 4
RTMR_VOLUME = 5
BLOCK_STORE_VOLUME = 6
COORDINATION_NODE = 7
COLUMN_STORE = 12
COLUMN_TABLE = 13
SEQUENCE = 15
REPLICATION = 16
TOPIC = 17
EXTERNAL_TABLE = 18
EXTERNAL_DATA_SOURCE = 19
VIEW = 20
RESOURCE_POOL = 21
@staticmethod
def is_table(entry: 'SchemeEntryType') -> bool:
"""
Check if entry is a row table.
Args:
entry (SchemeEntryType): Entry type to check
Returns:
bool: True if entry is a row table
"""
@staticmethod
def is_any_table(entry: 'SchemeEntryType') -> bool:
"""
Check if entry is any type of table.
Args:
entry (SchemeEntryType): Entry type to check
Returns:
bool: True if entry is any table type
"""
@staticmethod
def is_column_table(entry: 'SchemeEntryType') -> bool:
"""
Check if entry is a column table.
Args:
entry (SchemeEntryType): Entry type to check
Returns:
bool: True if entry is a column table
"""
@staticmethod
def is_directory(entry: 'SchemeEntryType') -> bool:
"""
Check if entry is a directory.
Args:
entry (SchemeEntryType): Entry type to check
Returns:
bool: True if entry is a directory
"""
@staticmethod
def is_topic(entry: 'SchemeEntryType') -> bool:
"""
Check if entry is a topic.
Args:
entry (SchemeEntryType): Entry type to check
Returns:
bool: True if entry is a topic
"""Data structures representing database schema entries and their metadata.
class SchemeEntry:
def __init__(
self,
name: str,
owner: str,
type: SchemeEntryType,
effective_permissions: List[str] = None,
permissions: List[Permissions] = None,
size_bytes: int = None
):
"""
Schema entry information.
Args:
name (str): Entry name
owner (str): Entry owner
type (SchemeEntryType): Entry type
effective_permissions (List[str], optional): Effective permissions
permissions (List[Permissions], optional): Permission settings
size_bytes (int, optional): Entry size in bytes
"""
@property
def name(self) -> str:
"""Entry name."""
@property
def owner(self) -> str:
"""Entry owner."""
@property
def type(self) -> SchemeEntryType:
"""Entry type."""
@property
def effective_permissions(self) -> List[str]:
"""Effective permissions for the entry."""
@property
def permissions(self) -> List[Permissions]:
"""Permission settings for the entry."""
@property
def size_bytes(self) -> int:
"""Entry size in bytes."""
@property
def is_directory(self) -> bool:
"""True if entry is a directory."""
@property
def is_table(self) -> bool:
"""True if entry is a table."""
@property
def is_topic(self) -> bool:
"""True if entry is a topic."""
class Directory:
def __init__(
self,
path: str,
children: List[SchemeEntry] = None,
self_entry: SchemeEntry = None
):
"""
Directory information with child entries.
Args:
path (str): Directory path
children (List[SchemeEntry], optional): Child entries
self_entry (SchemeEntry, optional): Directory's own entry info
"""
@property
def path(self) -> str:
"""Directory path."""
@property
def children(self) -> List[SchemeEntry]:
"""Child entries in the directory."""
@property
def self_entry(self) -> SchemeEntry:
"""Directory's own schema entry."""
def get_child(self, name: str) -> Optional[SchemeEntry]:
"""
Get child entry by name.
Args:
name (str): Child entry name
Returns:
Optional[SchemeEntry]: Child entry or None if not found
"""
def has_child(self, name: str) -> bool:
"""
Check if directory has a child with given name.
Args:
name (str): Child entry name
Returns:
bool: True if child exists
"""
def list_tables(self) -> List[SchemeEntry]:
"""
Get all table entries in the directory.
Returns:
List[SchemeEntry]: Table entries
"""
def list_directories(self) -> List[SchemeEntry]:
"""
Get all subdirectory entries.
Returns:
List[SchemeEntry]: Directory entries
"""
def list_topics(self) -> List[SchemeEntry]:
"""
Get all topic entries in the directory.
Returns:
List[SchemeEntry]: Topic entries
"""Access control and permission management for database objects.
class Permissions:
def __init__(
self,
subject: str = None,
permission_names: List[str] = None
):
"""
Permission settings for database objects.
Args:
subject (str, optional): Permission subject (user or group)
permission_names (List[str], optional): List of permission names
"""
@property
def subject(self) -> str:
"""Permission subject."""
@property
def permission_names(self) -> List[str]:
"""List of permission names."""
def add_permission(self, permission_name: str) -> 'Permissions':
"""
Add permission to the list.
Args:
permission_name (str): Permission name to add
Returns:
Permissions: Self for method chaining
"""
def remove_permission(self, permission_name: str) -> 'Permissions':
"""
Remove permission from the list.
Args:
permission_name (str): Permission name to remove
Returns:
Permissions: Self for method chaining
"""
def has_permission(self, permission_name: str) -> bool:
"""
Check if permission is granted.
Args:
permission_name (str): Permission name to check
Returns:
bool: True if permission is granted
"""
class ModifyPermissionsSettings(BaseRequestSettings):
def __init__(
self,
actions: List[PermissionAction] = None,
request_type: str = None,
trace_id: str = None,
timeout: float = None
):
"""
Settings for permission modification operations.
Args:
actions (List[PermissionAction], optional): Permission actions to perform
request_type (str, optional): Request type for logging
trace_id (str, optional): Request tracing identifier
timeout (float, optional): Operation timeout
"""
class PermissionAction:
def __init__(
self,
action_type: PermissionActionType,
subject: str,
permission_names: List[str]
):
"""
Permission modification action.
Args:
action_type (PermissionActionType): Type of action (grant/revoke)
subject (str): Permission subject
permission_names (List[str]): Permissions to modify
"""
@property
def action_type(self) -> PermissionActionType:
"""Type of permission action."""
@property
def subject(self) -> str:
"""Permission subject."""
@property
def permission_names(self) -> List[str]:
"""Permissions being modified."""
class PermissionActionType(enum.Enum):
"""Permission action types."""
GRANT = "grant"
REVOKE = "revoke"
SET = "set"Detailed table structure information retrieved through schema operations.
class TableDescription:
def __init__(
self,
name: str = None,
columns: List[TableColumn] = None,
primary_key: List[str] = None,
indexes: List[TableIndex] = None,
ttl_settings: TtlSettings = None,
storage_settings: StorageSettings = None,
column_families: List[ColumnFamily] = None,
attributes: Dict[str, str] = None,
compaction_policy: CompactionPolicy = None,
partitioning_settings: PartitioningSettings = None,
key_bloom_filter: bool = None,
read_replicas_settings: ReadReplicasSettings = None
):
"""
Complete table description with structure and settings.
Args:
name (str, optional): Table name
columns (List[TableColumn], optional): Table columns
primary_key (List[str], optional): Primary key column names
indexes (List[TableIndex], optional): Secondary indexes
ttl_settings (TtlSettings, optional): Time-to-live settings
storage_settings (StorageSettings, optional): Storage configuration
column_families (List[ColumnFamily], optional): Column families
attributes (Dict[str, str], optional): Table attributes
compaction_policy (CompactionPolicy, optional): Compaction settings
partitioning_settings (PartitioningSettings, optional): Partitioning configuration
key_bloom_filter (bool, optional): Enable key bloom filter
read_replicas_settings (ReadReplicasSettings, optional): Read replica settings
"""
def with_column(self, column: TableColumn) -> 'TableDescription':
"""
Add column to table description.
Args:
column (TableColumn): Column to add
Returns:
TableDescription: Self for method chaining
"""
def with_primary_key(self, *key_names: str) -> 'TableDescription':
"""
Set primary key columns.
Args:
*key_names (str): Primary key column names
Returns:
TableDescription: Self for method chaining
"""
def with_index(self, index: TableIndex) -> 'TableDescription':
"""
Add secondary index to table description.
Args:
index (TableIndex): Index to add
Returns:
TableDescription: Self for method chaining
"""
def with_ttl(self, ttl_settings: TtlSettings) -> 'TableDescription':
"""
Set time-to-live settings.
Args:
ttl_settings (TtlSettings): TTL configuration
Returns:
TableDescription: Self for method chaining
"""
@property
def columns(self) -> List[TableColumn]:
"""Table columns."""
@property
def primary_key(self) -> List[str]:
"""Primary key column names."""
@property
def indexes(self) -> List[TableIndex]:
"""Secondary indexes."""
def get_column(self, name: str) -> Optional[TableColumn]:
"""
Get column by name.
Args:
name (str): Column name
Returns:
Optional[TableColumn]: Column or None if not found
"""
def has_column(self, name: str) -> bool:
"""
Check if table has column with given name.
Args:
name (str): Column name
Returns:
bool: True if column exists
"""
class TableColumn:
def __init__(
self,
name: str,
type: Type,
family: str = None,
not_null: bool = False
):
"""
Table column description.
Args:
name (str): Column name
type (Type): Column YDB type
family (str, optional): Column family name
not_null (bool): Whether column is not nullable
"""
@property
def name(self) -> str:
"""Column name."""
@property
def type(self) -> Type:
"""Column YDB type."""
@property
def family(self) -> str:
"""Column family name."""
@property
def not_null(self) -> bool:
"""Whether column is not nullable."""
class TableIndex:
def __init__(
self,
name: str,
index_columns: List[str],
data_columns: List[str] = None,
global_index: bool = True,
unique: bool = False
):
"""
Table index description.
Args:
name (str): Index name
index_columns (List[str]): Indexed column names
data_columns (List[str], optional): Additional data columns
global_index (bool): Whether index is global
unique (bool): Whether index enforces uniqueness
"""
@property
def name(self) -> str:
"""Index name."""
@property
def index_columns(self) -> List[str]:
"""Indexed column names."""
@property
def data_columns(self) -> List[str]:
"""Additional data columns."""
@property
def global_index(self) -> bool:
"""Whether index is global."""
@property
def unique(self) -> bool:
"""Whether index enforces uniqueness."""Convenience functions for common schema operations.
def make_directory(
driver: Driver,
path: str,
settings: MakeDirectorySettings = None
):
"""
Create directory using driver.
Args:
driver (Driver): YDB driver instance
path (str): Directory path to create
settings (MakeDirectorySettings, optional): Creation settings
"""
def remove_directory(
driver: Driver,
path: str,
settings: RemoveDirectorySettings = None
):
"""
Remove directory using driver.
Args:
driver (Driver): YDB driver instance
path (str): Directory path to remove
settings (RemoveDirectorySettings, optional): Removal settings
"""
def list_directory(
driver: Driver,
path: str,
settings: ListDirectorySettings = None
) -> Directory:
"""
List directory contents using driver.
Args:
driver (Driver): YDB driver instance
path (str): Directory path to list
settings (ListDirectorySettings, optional): Listing settings
Returns:
Directory: Directory information
"""
def describe_path(
driver: Driver,
path: str,
settings: DescribePathSettings = None
) -> SchemeEntry:
"""
Describe path entry using driver.
Args:
driver (Driver): YDB driver instance
path (str): Path to describe
settings (DescribePathSettings, optional): Description settings
Returns:
SchemeEntry: Path entry information
"""import ydb
# Create driver and scheme client
driver = ydb.Driver(endpoint="grpc://localhost:2136", database="/local")
driver.wait(fail_fast=True)
scheme_client = ydb.SchemeClient(driver)
# Create directory hierarchy
scheme_client.make_directory("/local/app")
scheme_client.make_directory("/local/app/tables")
scheme_client.make_directory("/local/app/indexes")
# List directory contents
directory = scheme_client.list_directory("/local/app")
print(f"Directory: {directory.path}")
for entry in directory.children:
print(f" {entry.name} ({entry.type})")
if entry.is_directory:
print(f" Directory owned by: {entry.owner}")
elif entry.is_table:
print(f" Table size: {entry.size_bytes} bytes")# Inspect table schema
def inspect_table_schema(driver, table_path):
scheme_client = ydb.SchemeClient(driver)
# Get table entry information
entry = scheme_client.describe_path(table_path)
print(f"Table: {entry.name}")
print(f"Owner: {entry.owner}")
print(f"Size: {entry.size_bytes} bytes")
# Get detailed table description through session
session_pool = ydb.SessionPool(driver)
def get_table_description(session):
return session.describe_table(table_path)
table_desc = session_pool.retry_operation_sync(get_table_description)
print("\nColumns:")
for column in table_desc.columns:
nullable = "NULL" if not column.not_null else "NOT NULL"
print(f" {column.name}: {column.type} {nullable}")
print(f"\nPrimary Key: {', '.join(table_desc.primary_key)}")
if table_desc.indexes:
print("\nIndexes:")
for index in table_desc.indexes:
index_type = "UNIQUE" if index.unique else "NON-UNIQUE"
print(f" {index.name}: {index_type} on {', '.join(index.index_columns)}")
inspect_table_schema(driver, "/local/users")def traverse_directory_tree(scheme_client, path, level=0):
"""Recursively traverse and display directory tree."""
indent = " " * level
try:
directory = scheme_client.list_directory(path)
print(f"{indent}{directory.self_entry.name}/")
# Process child entries
for entry in directory.children:
if entry.is_directory:
# Recursively traverse subdirectories
child_path = f"{path}/{entry.name}".replace("//", "/")
traverse_directory_tree(scheme_client, child_path, level + 1)
else:
# Display non-directory entries
entry_type = "table" if entry.is_table else "topic" if entry.is_topic else str(entry.type)
print(f"{indent} {entry.name} ({entry_type})")
except ydb.NotFoundError:
print(f"{indent}[Directory not found: {path}]")
# Usage
traverse_directory_tree(scheme_client, "/local")# Modify permissions for a table
def grant_table_permissions(scheme_client, table_path, user, permissions):
"""Grant permissions to user for a table."""
permission_actions = [
ydb.PermissionAction(
action_type=ydb.PermissionActionType.GRANT,
subject=user,
permission_names=permissions
)
]
settings = ydb.ModifyPermissionsSettings(actions=permission_actions)
scheme_client.modify_permissions(table_path, settings)
print(f"Granted permissions {permissions} to {user} for {table_path}")
# Grant read/write permissions
grant_table_permissions(
scheme_client,
"/local/users",
"user@domain.com",
["ydb.generic.read", "ydb.generic.write"]
)
# Check current permissions
entry = scheme_client.describe_path("/local/users")
print("Current permissions:")
for perm in entry.permissions:
print(f" {perm.subject}: {perm.permission_names}")def compare_table_schemas(scheme_client, session_pool, table1_path, table2_path):
"""Compare schemas of two tables."""
def get_table_info(session, table_path):
desc = session.describe_table(table_path)
return {
"columns": {col.name: col.type for col in desc.columns},
"primary_key": desc.primary_key,
"indexes": {idx.name: idx.index_columns for idx in desc.indexes}
}
# Get both table descriptions
table1_info = session_pool.retry_operation_sync(get_table_info, table1_path)
table2_info = session_pool.retry_operation_sync(get_table_info, table2_path)
# Compare columns
print("Column differences:")
all_columns = set(table1_info["columns"].keys()) | set(table2_info["columns"].keys())
for col_name in sorted(all_columns):
type1 = table1_info["columns"].get(col_name, "MISSING")
type2 = table2_info["columns"].get(col_name, "MISSING")
if type1 != type2:
print(f" {col_name}: {type1} vs {type2}")
# Compare primary keys
if table1_info["primary_key"] != table2_info["primary_key"]:
print(f"Primary key difference:")
print(f" Table 1: {table1_info['primary_key']}")
print(f" Table 2: {table2_info['primary_key']}")
# Compare indexes
if table1_info["indexes"] != table2_info["indexes"]:
print("Index differences:")
all_indexes = set(table1_info["indexes"].keys()) | set(table2_info["indexes"].keys())
for idx_name in sorted(all_indexes):
idx1 = table1_info["indexes"].get(idx_name, "MISSING")
idx2 = table2_info["indexes"].get(idx_name, "MISSING")
if idx1 != idx2:
print(f" {idx_name}: {idx1} vs {idx2}")
# Usage
compare_table_schemas(scheme_client, session_pool, "/local/users", "/local/users_backup")# Common type aliases
SchemaPath = str
EntryName = str
PermissionName = str
Subject = str
# Permission collections
ReadPermissions = ["ydb.generic.read"]
WritePermissions = ["ydb.generic.write"]
AdminPermissions = ["ydb.generic.read", "ydb.generic.write", "ydb.generic.manage"]
# Entry type predicates
IsDirectory = Callable[[SchemeEntry], bool]
IsTable = Callable[[SchemeEntry], bool]
IsAnyTable = Callable[[SchemeEntry], bool]