Officially supported Python client for YDB distributed SQL database
Modern query interface supporting session management, transaction execution, result streaming, and advanced query options with the new Query service.
The Query service provides a modern interface for executing YQL queries with advanced features like streaming results and transaction management.
class QueryClientSync:
def __init__(self, driver: Driver, query_client_settings: QueryClientSettings = None):
"""
Create a query service client for modern YQL query execution.
Args:
driver (Driver): YDB driver instance
query_client_settings (QueryClientSettings, optional): Client configuration
"""
def session(self) -> QuerySession:
"""
Create a new query session.
Returns:
QuerySession: New query session instance
"""
class QueryClientSettings:
def __init__(self, timeout: float = None):
"""
Configuration settings for query client.
Args:
timeout (float, optional): Default timeout for operations
"""Query sessions provide the execution context for YQL queries with transaction management and result streaming.
class QuerySession:
def __init__(self, driver: Driver, settings: QueryClientSettings = None):
"""
Create a query session for executing YQL queries.
Args:
driver (Driver): YDB driver instance
settings (QueryClientSettings, optional): Session configuration
"""
def execute_query(
self,
query: str,
parameters: Dict[str, Any] = None,
tx_control: QueryTxControl = None,
settings: ExecuteQuerySettings = None
) -> Iterator[ResultSet]:
"""
Execute YQL query with parameters and transaction control.
Args:
query (str): YQL query text
parameters (Dict[str, Any], optional): Query parameters
tx_control (QueryTxControl, optional): Transaction control settings
settings (ExecuteQuerySettings, optional): Execution settings
Returns:
Iterator[ResultSet]: Query result sets
"""
def transaction(self, tx_settings: QueryTxSettings = None) -> QueryTxContext:
"""
Begin a new transaction for multiple operations.
Args:
tx_settings (QueryTxSettings, optional): Transaction settings
Returns:
QueryTxContext: Transaction context manager
"""
def close(self):
"""
Close the query session and release resources.
"""
class ExecuteQuerySettings:
def __init__(
self,
trace_id: str = None,
request_type: str = None,
stats_mode: QueryStatsMode = None,
exec_mode: QueryExecMode = None,
syntax: QuerySyntax = None
):
"""
Settings for query execution.
Args:
trace_id (str, optional): Request tracing identifier
request_type (str, optional): Request type for logging
stats_mode (QueryStatsMode, optional): Statistics collection mode
exec_mode (QueryExecMode, optional): Query execution mode
syntax (QuerySyntax, optional): Query syntax version
"""Transaction context for executing multiple queries within a single transaction scope.
class QueryTxContext:
def __init__(self, session: QuerySession, tx_settings: QueryTxSettings = None):
"""
Transaction context for query execution.
Args:
session (QuerySession): Parent query session
tx_settings (QueryTxSettings, optional): Transaction settings
"""
def execute(
self,
query: str,
parameters: Dict[str, Any] = None,
settings: ExecuteQuerySettings = None
) -> Iterator[ResultSet]:
"""
Execute query within the transaction.
Args:
query (str): YQL query text
parameters (Dict[str, Any], optional): Query parameters
settings (ExecuteQuerySettings, optional): Execution settings
Returns:
Iterator[ResultSet]: Query result sets
"""
def commit(self):
"""
Commit the transaction.
"""
def rollback(self):
"""
Rollback the transaction.
"""
def __enter__(self) -> 'QueryTxContext':
"""
Enter transaction context.
"""
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Exit transaction context with automatic commit/rollback.
"""
class QueryTxSettings:
def __init__(self, tx_mode: BaseQueryTxMode = None):
"""
Transaction settings for query operations.
Args:
tx_mode (BaseQueryTxMode, optional): Transaction isolation mode
"""Pool management for query sessions with automatic lifecycle management.
class QuerySessionPool:
def __init__(
self,
driver: Driver,
size: int = None,
query_client_settings: QueryClientSettings = None
):
"""
Pool of query sessions for efficient resource management.
Args:
driver (Driver): YDB driver instance
size (int, optional): Maximum pool size
query_client_settings (QueryClientSettings, optional): Default client settings
"""
def acquire(self, timeout: float = None) -> QuerySession:
"""
Acquire a query session from the pool.
Args:
timeout (float, optional): Acquisition timeout
Returns:
QuerySession: Available query session
"""
def release(self, session: QuerySession):
"""
Release query session back to the pool.
Args:
session (QuerySession): Session to release
"""
def retry_operation(
self,
callee: Callable[[QuerySession], Any],
retry_settings: RetrySettings = None,
*args,
**kwargs
) -> Any:
"""
Execute operation with automatic retry and session management.
Args:
callee (Callable): Function to execute with session as first argument
retry_settings (RetrySettings, optional): Custom retry configuration
*args: Additional arguments for callee
**kwargs: Additional keyword arguments for callee
Returns:
Any: Result of callee execution
"""
def stop(self, timeout: float = None):
"""
Stop the session pool and close all sessions.
Args:
timeout (float, optional): Shutdown timeout
"""Transaction isolation modes for query operations.
class BaseQueryTxMode:
"""Base class for query transaction modes."""
class QuerySerializableReadWrite(BaseQueryTxMode):
"""Serializable read-write transaction mode."""
class QueryOnlineReadOnly(BaseQueryTxMode):
def __init__(self, allow_inconsistent_reads: bool = False):
"""
Online read-only transaction mode.
Args:
allow_inconsistent_reads (bool): Allow reading inconsistent data
"""
class QuerySnapshotReadOnly(BaseQueryTxMode):
"""Snapshot read-only transaction mode."""
class QueryStaleReadOnly(BaseQueryTxMode):
"""Stale read-only transaction mode."""
class QueryTxControl:
def __init__(
self,
tx_mode: BaseQueryTxMode = None,
commit_tx: bool = False,
begin_tx: bool = True
):
"""
Transaction control settings for individual queries.
Args:
tx_mode (BaseQueryTxMode, optional): Transaction mode
commit_tx (bool): Commit transaction after query execution
begin_tx (bool): Begin new transaction if none exists
"""Result handling for query execution with streaming and metadata support.
class ResultSet:
def __init__(self, result_data, columns):
"""
Query result set with rows and metadata.
Args:
result_data: Raw result data
columns: Column metadata
"""
@property
def columns(self) -> List[Column]:
"""
Get column metadata for the result set.
Returns:
List[Column]: Column definitions
"""
@property
def rows(self) -> Iterator[Row]:
"""
Iterate over result rows.
Returns:
Iterator[Row]: Result row iterator
"""
def fetch_all(self) -> List[Row]:
"""
Fetch all remaining rows from the result set.
Returns:
List[Row]: All remaining rows
"""
class Column:
def __init__(self, name: str, type: Type):
"""
Column metadata for result sets.
Args:
name (str): Column name
type (Type): YDB column type
"""
@property
def name(self) -> str:
"""Column name."""
@property
def type(self) -> Type:
"""Column YDB type."""
class Row:
def __getitem__(self, key: Union[int, str]) -> Any:
"""
Access row values by index or column name.
Args:
key (Union[int, str]): Column index or name
Returns:
Any: Column value
"""
def __getattr__(self, name: str) -> Any:
"""
Access row values by column name as attributes.
Args:
name (str): Column name
Returns:
Any: Column value
"""Configuration options for query execution behavior.
class QueryStatsMode(enum.Enum):
"""Statistics collection modes for query execution."""
NONE = "none"
BASIC = "basic"
FULL = "full"
PROFILE = "profile"
class QueryExecMode(enum.Enum):
"""Query execution modes."""
PARSE = "parse"
VALIDATE = "validate"
EXPLAIN = "explain"
EXECUTE = "execute"
class QuerySyntax(enum.Enum):
"""Query syntax versions."""
YQL_V1 = "yql_v1"
PG = "pg"
class QueryExplainResultFormat(enum.Enum):
"""Explain result format options."""
TEXT = "text"
JSON = "json"import ydb
# Create driver and query client
driver = ydb.Driver(endpoint="grpc://localhost:2136", database="/local")
query_client = ydb.QueryClientSync(driver)
# Execute simple query
session = query_client.session()
try:
result_sets = session.execute_query("SELECT 1 AS value")
for result_set in result_sets:
for row in result_set.rows:
print(f"Value: {row.value}")
finally:
session.close()# Execute query with parameters
parameters = {"$user_id": 123, "$status": "active"}
query = """
SELECT name, email
FROM users
WHERE user_id = $user_id AND status = $status
"""
result_sets = session.execute_query(query, parameters=parameters)
for result_set in result_sets:
for row in result_set.rows:
print(f"User: {row.name}, Email: {row.email}")# Execute multiple queries in a transaction
with session.transaction(QueryTxSettings(QuerySerializableReadWrite())) as tx:
# Update user status
tx.execute(
"UPDATE users SET status = 'inactive' WHERE user_id = $user_id",
parameters={"$user_id": 123}
)
# Log the change
tx.execute(
"INSERT INTO audit_log (user_id, action, timestamp) VALUES ($user_id, $action, $ts)",
parameters={
"$user_id": 123,
"$action": "deactivate",
"$ts": datetime.utcnow()
}
)
# Transaction automatically commits on context exit# Use session pool for better resource management
pool = ydb.QuerySessionPool(driver, size=10)
def execute_user_query(session, user_id):
result_sets = session.execute_query(
"SELECT * FROM users WHERE user_id = $user_id",
parameters={"$user_id": user_id}
)
return list(result_sets[0].rows)
# Execute with automatic retry and session management
try:
users = pool.retry_operation(execute_user_query, None, 123)
print(f"Found {len(users)} users")
finally:
pool.stop()# Execute with statistics and explain
settings = ydb.ExecuteQuerySettings(
stats_mode=ydb.QueryStatsMode.FULL,
exec_mode=ydb.QueryExecMode.EXPLAIN,
syntax=ydb.QuerySyntax.YQL_V1
)
result_sets = session.execute_query(
"SELECT COUNT(*) FROM large_table",
settings=settings
)# Type aliases for common patterns
QueryParameters = Dict[str, Any]
QueryResult = Iterator[ResultSet]
QueryCallback = Callable[[QuerySession], Any]
# Common transaction modes
SerializableRW = QuerySerializableReadWrite
OnlineRO = QueryOnlineReadOnly
SnapshotRO = QuerySnapshotReadOnly
StaleRO = QueryStaleReadOnly