CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ydb

Officially supported Python client for YDB distributed SQL database

Overview
Eval results
Files

query-service.mddocs/

Query Service Operations

Modern query interface supporting session management, transaction execution, result streaming, and advanced query options with the new Query service.

Capabilities

Query Service Client

The Query service provides a modern interface for executing YQL queries with advanced features like streaming results and transaction management.

class QueryClientSync:
    def __init__(self, driver: Driver, query_client_settings: QueryClientSettings = None):
        """
        Create a query service client for modern YQL query execution.
        
        Args:
            driver (Driver): YDB driver instance
            query_client_settings (QueryClientSettings, optional): Client configuration
        """

    def session(self) -> QuerySession:
        """
        Create a new query session.
        
        Returns:
            QuerySession: New query session instance
        """

class QueryClientSettings:
    def __init__(self, timeout: float = None):
        """
        Configuration settings for query client.
        
        Args:
            timeout (float, optional): Default timeout for operations
        """

Query Sessions

Query sessions provide the execution context for YQL queries with transaction management and result streaming.

class QuerySession:
    def __init__(self, driver: Driver, settings: QueryClientSettings = None):
        """
        Create a query session for executing YQL queries.
        
        Args:
            driver (Driver): YDB driver instance
            settings (QueryClientSettings, optional): Session configuration
        """

    def execute_query(
        self,
        query: str,
        parameters: Dict[str, Any] = None,
        tx_control: QueryTxControl = None,
        settings: ExecuteQuerySettings = None
    ) -> Iterator[ResultSet]:
        """
        Execute YQL query with parameters and transaction control.
        
        Args:
            query (str): YQL query text
            parameters (Dict[str, Any], optional): Query parameters
            tx_control (QueryTxControl, optional): Transaction control settings
            settings (ExecuteQuerySettings, optional): Execution settings
            
        Returns:
            Iterator[ResultSet]: Query result sets
        """

    def transaction(self, tx_settings: QueryTxSettings = None) -> QueryTxContext:
        """
        Begin a new transaction for multiple operations.
        
        Args:
            tx_settings (QueryTxSettings, optional): Transaction settings
            
        Returns:
            QueryTxContext: Transaction context manager
        """

    def close(self):
        """
        Close the query session and release resources.
        """

class ExecuteQuerySettings:
    def __init__(
        self,
        trace_id: str = None,
        request_type: str = None,
        stats_mode: QueryStatsMode = None,
        exec_mode: QueryExecMode = None,
        syntax: QuerySyntax = None
    ):
        """
        Settings for query execution.
        
        Args:
            trace_id (str, optional): Request tracing identifier
            request_type (str, optional): Request type for logging
            stats_mode (QueryStatsMode, optional): Statistics collection mode
            exec_mode (QueryExecMode, optional): Query execution mode
            syntax (QuerySyntax, optional): Query syntax version
        """

Query Transactions

Transaction context for executing multiple queries within a single transaction scope.

class QueryTxContext:
    def __init__(self, session: QuerySession, tx_settings: QueryTxSettings = None):
        """
        Transaction context for query execution.
        
        Args:
            session (QuerySession): Parent query session
            tx_settings (QueryTxSettings, optional): Transaction settings
        """

    def execute(
        self,
        query: str,
        parameters: Dict[str, Any] = None,
        settings: ExecuteQuerySettings = None
    ) -> Iterator[ResultSet]:
        """
        Execute query within the transaction.
        
        Args:
            query (str): YQL query text
            parameters (Dict[str, Any], optional): Query parameters
            settings (ExecuteQuerySettings, optional): Execution settings
            
        Returns:
            Iterator[ResultSet]: Query result sets
        """

    def commit(self):
        """
        Commit the transaction.
        """

    def rollback(self):
        """
        Rollback the transaction.
        """

    def __enter__(self) -> 'QueryTxContext':
        """
        Enter transaction context.
        """

    def __exit__(self, exc_type, exc_val, exc_tb):
        """
        Exit transaction context with automatic commit/rollback.
        """

class QueryTxSettings:
    def __init__(self, tx_mode: BaseQueryTxMode = None):
        """
        Transaction settings for query operations.
        
        Args:
            tx_mode (BaseQueryTxMode, optional): Transaction isolation mode
        """

Query Session Pool

Pool management for query sessions with automatic lifecycle management.

class QuerySessionPool:
    def __init__(
        self,
        driver: Driver,
        size: int = None,
        query_client_settings: QueryClientSettings = None
    ):
        """
        Pool of query sessions for efficient resource management.
        
        Args:
            driver (Driver): YDB driver instance
            size (int, optional): Maximum pool size
            query_client_settings (QueryClientSettings, optional): Default client settings
        """

    def acquire(self, timeout: float = None) -> QuerySession:
        """
        Acquire a query session from the pool.
        
        Args:
            timeout (float, optional): Acquisition timeout
            
        Returns:
            QuerySession: Available query session
        """

    def release(self, session: QuerySession):
        """
        Release query session back to the pool.
        
        Args:
            session (QuerySession): Session to release
        """

    def retry_operation(
        self,
        callee: Callable[[QuerySession], Any],
        retry_settings: RetrySettings = None,
        *args,
        **kwargs
    ) -> Any:
        """
        Execute operation with automatic retry and session management.
        
        Args:
            callee (Callable): Function to execute with session as first argument
            retry_settings (RetrySettings, optional): Custom retry configuration
            *args: Additional arguments for callee
            **kwargs: Additional keyword arguments for callee
            
        Returns:
            Any: Result of callee execution
        """

    def stop(self, timeout: float = None):
        """
        Stop the session pool and close all sessions.
        
        Args:
            timeout (float, optional): Shutdown timeout
        """

Transaction Modes

Transaction isolation modes for query operations.

class BaseQueryTxMode:
    """Base class for query transaction modes."""

class QuerySerializableReadWrite(BaseQueryTxMode):
    """Serializable read-write transaction mode."""
    
class QueryOnlineReadOnly(BaseQueryTxMode):
    def __init__(self, allow_inconsistent_reads: bool = False):
        """
        Online read-only transaction mode.
        
        Args:
            allow_inconsistent_reads (bool): Allow reading inconsistent data
        """

class QuerySnapshotReadOnly(BaseQueryTxMode):
    """Snapshot read-only transaction mode."""

class QueryStaleReadOnly(BaseQueryTxMode):
    """Stale read-only transaction mode."""

class QueryTxControl:
    def __init__(
        self,
        tx_mode: BaseQueryTxMode = None,
        commit_tx: bool = False,
        begin_tx: bool = True
    ):
        """
        Transaction control settings for individual queries.
        
        Args:
            tx_mode (BaseQueryTxMode, optional): Transaction mode
            commit_tx (bool): Commit transaction after query execution
            begin_tx (bool): Begin new transaction if none exists
        """

Query Results

Result handling for query execution with streaming and metadata support.

class ResultSet:
    def __init__(self, result_data, columns):
        """
        Query result set with rows and metadata.
        
        Args:
            result_data: Raw result data
            columns: Column metadata
        """

    @property
    def columns(self) -> List[Column]:
        """
        Get column metadata for the result set.
        
        Returns:
            List[Column]: Column definitions
        """

    @property
    def rows(self) -> Iterator[Row]:
        """
        Iterate over result rows.
        
        Returns:
            Iterator[Row]: Result row iterator
        """

    def fetch_all(self) -> List[Row]:
        """
        Fetch all remaining rows from the result set.
        
        Returns:
            List[Row]: All remaining rows
        """

class Column:
    def __init__(self, name: str, type: Type):
        """
        Column metadata for result sets.
        
        Args:
            name (str): Column name
            type (Type): YDB column type
        """

    @property
    def name(self) -> str:
        """Column name."""

    @property
    def type(self) -> Type:
        """Column YDB type."""

class Row:
    def __getitem__(self, key: Union[int, str]) -> Any:
        """
        Access row values by index or column name.
        
        Args:
            key (Union[int, str]): Column index or name
            
        Returns:
            Any: Column value
        """

    def __getattr__(self, name: str) -> Any:
        """
        Access row values by column name as attributes.
        
        Args:
            name (str): Column name
            
        Returns:
            Any: Column value
        """

Query Options

Configuration options for query execution behavior.

class QueryStatsMode(enum.Enum):
    """Statistics collection modes for query execution."""
    NONE = "none"
    BASIC = "basic"
    FULL = "full"
    PROFILE = "profile"

class QueryExecMode(enum.Enum):
    """Query execution modes."""
    PARSE = "parse"
    VALIDATE = "validate"
    EXPLAIN = "explain"
    EXECUTE = "execute"

class QuerySyntax(enum.Enum):
    """Query syntax versions."""
    YQL_V1 = "yql_v1"
    PG = "pg"

class QueryExplainResultFormat(enum.Enum):
    """Explain result format options."""
    TEXT = "text"
    JSON = "json"

Usage Examples

Basic Query Execution

import ydb

# Create driver and query client
driver = ydb.Driver(endpoint="grpc://localhost:2136", database="/local")
query_client = ydb.QueryClientSync(driver)

# Execute simple query
session = query_client.session()
try:
    result_sets = session.execute_query("SELECT 1 AS value")
    for result_set in result_sets:
        for row in result_set.rows:
            print(f"Value: {row.value}")
finally:
    session.close()

Parameterized Queries

# Execute query with parameters
parameters = {"$user_id": 123, "$status": "active"}
query = """
    SELECT name, email 
    FROM users 
    WHERE user_id = $user_id AND status = $status
"""

result_sets = session.execute_query(query, parameters=parameters)
for result_set in result_sets:
    for row in result_set.rows:
        print(f"User: {row.name}, Email: {row.email}")

Transaction Management

# Execute multiple queries in a transaction
with session.transaction(QueryTxSettings(QuerySerializableReadWrite())) as tx:
    # Update user status
    tx.execute(
        "UPDATE users SET status = 'inactive' WHERE user_id = $user_id",
        parameters={"$user_id": 123}
    )
    
    # Log the change
    tx.execute(
        "INSERT INTO audit_log (user_id, action, timestamp) VALUES ($user_id, $action, $ts)",
        parameters={
            "$user_id": 123,
            "$action": "deactivate",
            "$ts": datetime.utcnow()
        }
    )
    # Transaction automatically commits on context exit

Query Session Pool

# Use session pool for better resource management
pool = ydb.QuerySessionPool(driver, size=10)

def execute_user_query(session, user_id):
    result_sets = session.execute_query(
        "SELECT * FROM users WHERE user_id = $user_id",
        parameters={"$user_id": user_id}
    )
    return list(result_sets[0].rows)

# Execute with automatic retry and session management
try:
    users = pool.retry_operation(execute_user_query, None, 123)
    print(f"Found {len(users)} users")
finally:
    pool.stop()

Advanced Query Options

# Execute with statistics and explain
settings = ydb.ExecuteQuerySettings(
    stats_mode=ydb.QueryStatsMode.FULL,
    exec_mode=ydb.QueryExecMode.EXPLAIN,
    syntax=ydb.QuerySyntax.YQL_V1
)

result_sets = session.execute_query(
    "SELECT COUNT(*) FROM large_table",
    settings=settings
)

Type Definitions

# Type aliases for common patterns
QueryParameters = Dict[str, Any]
QueryResult = Iterator[ResultSet]
QueryCallback = Callable[[QuerySession], Any]

# Common transaction modes
SerializableRW = QuerySerializableReadWrite
OnlineRO = QueryOnlineReadOnly
SnapshotRO = QuerySnapshotReadOnly
StaleRO = QueryStaleReadOnly

Install with Tessl CLI

npx tessl i tessl/pypi-ydb

docs

async-operations.md

authentication.md

data-types.md

dbapi-interface.md

driver-connection.md

error-handling.md

index.md

query-service.md

schema-operations.md

sqlalchemy-integration.md

table-operations.md

topic-operations.md

tile.json