CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ydb

Officially supported Python client for YDB distributed SQL database

Overview
Eval results
Files

error-handling.mddocs/

Error Handling and Retries

Comprehensive error handling with detailed error hierarchies, retry strategies, backoff configurations, and operation result processing.

Capabilities

Error Hierarchy

YDB Python SDK provides a comprehensive error hierarchy for handling different types of failures.

class Error(Exception):
    """Base class for all YDB errors."""
    
    def __init__(
        self,
        message: str,
        issues: Optional[Iterable[IssueMessage]] = None
    ):
        """
        Base YDB error.
        
        Args:
            message (str): Error message
            issues (Optional[Iterable[IssueMessage]]): Detailed error issues
        """
        super().__init__(message)
        self.message = message
        self.issues = issues or []

    @property
    def status(self) -> Optional[int]:
        """Error status code."""

    @property
    def issues(self) -> List[IssueMessage]:
        """Detailed error issues."""

class RetryableError(Error):
    """Base class for errors that can be retried."""

class BadRequestError(Error):
    """Request validation or syntax errors."""
    status = StatusCode.BAD_REQUEST

class UnauthorizedError(Error):
    """Authentication errors."""
    status = StatusCode.UNAUTHORIZED

class ForbiddenError(Error):
    """Authorization/permission errors."""
    status = StatusCode.FORBIDDEN

class NotFoundError(Error):
    """Resource not found errors."""
    status = StatusCode.NOT_FOUND

class AlreadyExistsError(Error):
    """Resource already exists errors."""
    status = StatusCode.ALREADY_EXISTS

class PreconditionFailedError(RetryableError):
    """Precondition check failures."""
    status = StatusCode.PRECONDITION_FAILED

class AbortedError(RetryableError):
    """Aborted operations due to conflicts."""
    status = StatusCode.ABORTED

class UnavailableError(RetryableError):
    """Service temporarily unavailable."""
    status = StatusCode.UNAVAILABLE

class OverloadedError(RetryableError):
    """Service overloaded, backoff required."""
    status = StatusCode.OVERLOADED

class TimeoutError(RetryableError):
    """Operation timeout errors."""
    status = StatusCode.TIMEOUT

class CancelledError(Error):
    """Cancelled operations."""
    status = StatusCode.CANCELLED

class UndeterminedError(RetryableError):
    """Operations with undetermined outcome."""
    status = StatusCode.UNDETERMINED

class InternalError(RetryableError):
    """Internal service errors."""
    status = StatusCode.INTERNAL_ERROR

class UnsupportedError(Error):
    """Unsupported operations."""
    status = StatusCode.UNSUPPORTED

class SchemeError(Error):
    """Schema-related errors."""
    status = StatusCode.SCHEME_ERROR

class GenericError(Error):
    """Generic/unclassified errors."""
    status = StatusCode.GENERIC_ERROR

Session-Specific Errors

Errors related to session lifecycle and management.

class BadSessionError(RetryableError):
    """Invalid or corrupted session state."""
    status = StatusCode.BAD_SESSION

class SessionExpiredError(RetryableError):
    """Session has expired and needs renewal."""
    status = StatusCode.SESSION_EXPIRED

class SessionBusyError(RetryableError):
    """Session is busy with another operation."""
    status = StatusCode.SESSION_BUSY

class SessionPoolEmptyError(RetryableError):
    """No available sessions in the pool."""
    status = StatusCode.SESSION_POOL_EMPTY

class SessionPoolClosedError(Error):
    """Session pool has been closed."""
    status = StatusCode.SESSION_POOL_CLOSED

class QueryCacheEmptyError(RetryableError):
    """Query not found in prepared query cache."""

Connection Errors

Network and transport-level error conditions.

class ConnectionError(RetryableError):
    """Base class for connection-related errors."""

class ConnectionLostError(ConnectionError):
    """Connection lost during operation."""
    status = StatusCode.CONNECTION_LOST

class ConnectionFailureError(ConnectionError):
    """Failed to establish connection."""
    status = StatusCode.CONNECTION_FAILURE

class DeadlineExceededError(ConnectionError):
    """Operation exceeded deadline."""
    status = StatusCode.DEADLINE_EXCEEDED

class ClientInternalError(Error):
    """Client-side internal errors."""
    status = StatusCode.CLIENT_INTERNAL_ERROR

class ClientResourceExhaustedError(Error):
    """Client resources exhausted."""

class ClientDiscoveryError(ConnectionError):
    """Endpoint discovery failures."""

class UnauthenticatedError(Error):
    """Authentication credential errors."""
    status = StatusCode.UNAUTHENTICATED

class UnimplementedError(Error):
    """Unimplemented features or operations."""
    status = StatusCode.UNIMPLEMENTED

Status Codes

Enumeration of all possible YDB status codes.

class StatusCode(enum.IntEnum):
    """YDB operation status codes."""
    
    # Success
    SUCCESS = 0
    
    # Client errors (4xx equivalent)
    BAD_REQUEST = 400
    UNAUTHORIZED = 401
    FORBIDDEN = 403
    NOT_FOUND = 404
    ALREADY_EXISTS = 409
    PRECONDITION_FAILED = 412
    UNSUPPORTED = 501
    
    # Server errors (5xx equivalent)
    INTERNAL_ERROR = 500
    UNAVAILABLE = 503
    TIMEOUT = 504
    OVERLOADED = 503
    
    # YDB-specific
    ABORTED = 10
    CANCELLED = 1
    UNDETERMINED = 2
    SCHEME_ERROR = 20
    GENERIC_ERROR = 21
    BAD_SESSION = 30
    SESSION_EXPIRED = 31
    SESSION_BUSY = 32
    
    # Transport errors
    CONNECTION_LOST = 401010
    CONNECTION_FAILURE = 401020
    DEADLINE_EXCEEDED = 401030
    CLIENT_INTERNAL_ERROR = 401040
    UNIMPLEMENTED = 401050
    
    # Client pool errors  
    UNAUTHENTICATED = 402030
    SESSION_POOL_EMPTY = 402040
    SESSION_POOL_CLOSED = 402050

def is_retryable_error(error: Exception) -> bool:
    """
    Check if error is retryable.
    
    Args:
        error (Exception): Error to check
        
    Returns:
        bool: True if error can be retried
    """

def get_error_class(status_code: int) -> Type[Error]:
    """
    Get error class for status code.
    
    Args:
        status_code (int): YDB status code
        
    Returns:
        Type[Error]: Appropriate error class
    """

Issue Messages

Detailed error information with nested issue structures.

class IssueMessage:
    def __init__(
        self,
        message: str,
        issue_code: int = None,
        severity: int = None,
        issues: List['IssueMessage'] = None
    ):
        """
        Detailed error issue information.
        
        Args:
            message (str): Issue description
            issue_code (int, optional): Issue-specific code
            severity (int, optional): Issue severity level
            issues (List[IssueMessage], optional): Nested sub-issues
        """

    @property
    def message(self) -> str:
        """Issue description message."""

    @property
    def issue_code(self) -> Optional[int]:
        """Issue-specific code."""

    @property
    def severity(self) -> Optional[int]:
        """Issue severity level."""

    @property
    def issues(self) -> List['IssueMessage']:
        """Nested sub-issues."""

    def to_dict(self) -> dict:
        """
        Convert issue to dictionary representation.
        
        Returns:
            dict: Issue as dictionary
        """

    def __str__(self) -> str:
        """String representation of the issue."""

class IssueSeverity(enum.IntEnum):
    """Issue severity levels."""
    INFO = 1
    NOTICE = 2
    WARNING = 3
    ERROR = 4
    FATAL = 5

Retry Configuration

Configurable retry strategies with exponential backoff and jitter.

class RetrySettings:
    def __init__(
        self,
        max_retries: int = 10,
        max_session_acquire_timeout: float = None,
        fast_backoff_settings: BackoffSettings = None,
        slow_backoff_settings: BackoffSettings = None,
        retry_not_found: bool = False,
        retry_internal_error: bool = True,
        unknown_error_handler: Callable[[Exception], bool] = None,
        on_ydb_error_callback: Callable[[YdbError], None] = None
    ):
        """
        Retry configuration for YDB operations.
        
        Args:
            max_retries (int): Maximum number of retry attempts
            max_session_acquire_timeout (float, optional): Session acquisition timeout
            fast_backoff_settings (BackoffSettings, optional): Fast retry backoff
            slow_backoff_settings (BackoffSettings, optional): Slow retry backoff
            retry_not_found (bool): Whether to retry NotFound errors
            retry_internal_error (bool): Whether to retry internal errors
            unknown_error_handler (Callable, optional): Handler for unknown errors
            on_ydb_error_callback (Callable, optional): Error callback function
        """

    @property
    def max_retries(self) -> int:
        """Maximum retry attempts."""

    @property
    def fast_backoff_settings(self) -> BackoffSettings:
        """Fast backoff configuration."""

    @property
    def slow_backoff_settings(self) -> BackoffSettings:
        """Slow backoff configuration."""

    def with_max_retries(self, max_retries: int) -> 'RetrySettings':
        """
        Create copy with different max retries.
        
        Args:
            max_retries (int): New max retry count
            
        Returns:
            RetrySettings: New retry settings instance
        """

    def with_fast_backoff(self, backoff: BackoffSettings) -> 'RetrySettings':
        """
        Create copy with different fast backoff.
        
        Args:
            backoff (BackoffSettings): New fast backoff settings
            
        Returns:
            RetrySettings: New retry settings instance
        """

class BackoffSettings:
    def __init__(
        self,
        slot_duration: float = 1.0,
        ceiling: int = 6,
        max_backoff: float = 32.0,
        jitter_limit: float = 1.0,
        uncertain_ratio: float = 0.1
    ):
        """
        Exponential backoff configuration.
        
        Args:
            slot_duration (float): Base slot duration in seconds
            ceiling (int): Backoff ceiling (2^ceiling * slot_duration max)
            max_backoff (float): Maximum backoff time in seconds
            jitter_limit (float): Maximum jitter multiplier
            uncertain_ratio (float): Ratio for uncertain error handling
        """

    @property
    def slot_duration(self) -> float:
        """Base slot duration."""

    @property
    def ceiling(self) -> int:
        """Backoff ceiling exponent."""

    @property
    def max_backoff(self) -> float:
        """Maximum backoff time."""

    def calculate_backoff(self, attempt: int) -> float:
        """
        Calculate backoff time for attempt.
        
        Args:
            attempt (int): Retry attempt number (0-based)
            
        Returns:
            float: Backoff time in seconds
        """

    def with_jitter(self, backoff: float) -> float:
        """
        Apply jitter to backoff time.
        
        Args:
            backoff (float): Base backoff time
            
        Returns:
            float: Jittered backoff time
        """

# Predefined backoff settings
DEFAULT_FAST_BACKOFF = BackoffSettings(slot_duration=0.005, ceiling=10, max_backoff=0.2)
DEFAULT_SLOW_BACKOFF = BackoffSettings(slot_duration=1.0, ceiling=6, max_backoff=32.0)

Retry Operations

High-level retry functionality for database operations.

def retry_operation_sync(
    callee: Callable[..., Any],
    retry_settings: RetrySettings = None,
    session_pool: SessionPool = None,
    *args,
    **kwargs
) -> Any:
    """
    Execute operation with retry logic.
    
    Args:
        callee (Callable): Function to execute
        retry_settings (RetrySettings, optional): Retry configuration
        session_pool (SessionPool, optional): Session pool for session-based operations
        *args: Arguments for callee
        **kwargs: Keyword arguments for callee
        
    Returns:
        Any: Result of successful callee execution
        
    Raises:
        Error: Final error if all retries exhausted
    """

async def retry_operation(
    callee: Callable[..., Awaitable[Any]],
    retry_settings: RetrySettings = None,
    session_pool: SessionPool = None,
    *args,
    **kwargs
) -> Any:
    """
    Execute async operation with retry logic.
    
    Args:
        callee (Callable): Async function to execute
        retry_settings (RetrySettings, optional): Retry configuration
        session_pool (SessionPool, optional): Session pool for session-based operations
        *args: Arguments for callee
        **kwargs: Keyword arguments for callee
        
    Returns:
        Any: Result of successful callee execution
        
    Raises:
        Error: Final error if all retries exhausted
    """

class YdbRetryOperationSleepOpt:
    def __init__(
        self,
        timeout: float = None,
        backoff_settings: BackoffSettings = None
    ):
        """
        Sleep options for retry operations.
        
        Args:
            timeout (float, optional): Maximum sleep time
            backoff_settings (BackoffSettings, optional): Backoff configuration
        """

class YdbRetryOperationFinalResult:
    def __init__(
        self,
        result: Any = None,
        error: Exception = None,
        attempts: int = 0
    ):
        """
        Final result of retry operation.
        
        Args:
            result (Any, optional): Operation result if successful
            error (Exception, optional): Final error if failed
            attempts (int): Number of attempts made
        """

    @property
    def is_success(self) -> bool:
        """True if operation succeeded."""

    @property
    def is_failure(self) -> bool:
        """True if operation failed."""

def retry_operation_impl(
    callee: Callable,
    retry_settings: RetrySettings = None,
    *args,
    **kwargs
) -> YdbRetryOperationFinalResult:
    """
    Low-level retry implementation.
    
    Args:
        callee (Callable): Function to execute
        retry_settings (RetrySettings, optional): Retry configuration
        *args: Arguments for callee
        **kwargs: Keyword arguments for callee
        
    Returns:
        YdbRetryOperationFinalResult: Operation result with metadata
    """

Error Classification

Utilities for categorizing and handling different error types.

def classify_error(error: Exception) -> ErrorCategory:
    """
    Classify error into category for handling strategy.
    
    Args:
        error (Exception): Error to classify
        
    Returns:
        ErrorCategory: Error category
    """

class ErrorCategory(enum.Enum):
    """Error classification categories."""
    RETRIABLE_FAST = "retriable_fast"        # Quick retry with fast backoff
    RETRIABLE_SLOW = "retriable_slow"        # Retry with slow backoff  
    RETRIABLE_UNCERTAIN = "retriable_uncertain"  # Uncertain outcome, careful retry
    NON_RETRIABLE = "non_retriable"          # Don't retry these errors
    FATAL = "fatal"                          # Fatal errors, stop immediately

def is_transport_error(error: Exception) -> bool:
    """
    Check if error is transport/network related.
    
    Args:
        error (Exception): Error to check
        
    Returns:
        bool: True if transport error
    """

def is_server_error(error: Exception) -> bool:
    """
    Check if error is server-side.
    
    Args:
        error (Exception): Error to check
        
    Returns:
        bool: True if server error
    """

def is_client_error(error: Exception) -> bool:
    """
    Check if error is client-side.
    
    Args:
        error (Exception): Error to check
        
    Returns:
        bool: True if client error
    """

def should_retry_error(
    error: Exception,
    retry_settings: RetrySettings = None
) -> bool:
    """
    Determine if error should be retried based on settings.
    
    Args:
        error (Exception): Error to evaluate
        retry_settings (RetrySettings, optional): Retry configuration
        
    Returns:
        bool: True if error should be retried
    """

def get_retry_backoff(
    error: Exception,
    attempt: int,
    retry_settings: RetrySettings = None
) -> float:
    """
    Calculate appropriate backoff time for error and attempt.
    
    Args:
        error (Exception): Error that occurred
        attempt (int): Retry attempt number
        retry_settings (RetrySettings, optional): Retry configuration
        
    Returns:
        float: Backoff time in seconds
    """

Error Context

Context management for error handling and debugging.

class ErrorContext:
    def __init__(
        self,
        operation: str = None,
        request_id: str = None,
        session_id: str = None,
        endpoint: str = None,
        database: str = None
    ):
        """
        Context information for error analysis.
        
        Args:
            operation (str, optional): Operation being performed
            request_id (str, optional): Request identifier
            session_id (str, optional): Session identifier
            endpoint (str, optional): YDB endpoint
            database (str, optional): Database path
        """

    @property
    def operation(self) -> Optional[str]:
        """Operation being performed."""

    @property
    def request_id(self) -> Optional[str]:
        """Request identifier."""

    @property
    def session_id(self) -> Optional[str]:
        """Session identifier."""

    def to_dict(self) -> dict:
        """Convert context to dictionary."""

    def __str__(self) -> str:
        """String representation of context."""

class ErrorHandler:
    def __init__(
        self,
        logger: logging.Logger = None,
        context: ErrorContext = None
    ):
        """
        Error handling utilities.
        
        Args:
            logger (logging.Logger, optional): Logger for error reporting
            context (ErrorContext, optional): Error context information
        """

    def handle_error(
        self,
        error: Exception,
        operation: str = None
    ) -> bool:
        """
        Handle error with appropriate logging and classification.
        
        Args:
            error (Exception): Error to handle
            operation (str, optional): Operation context
            
        Returns:
            bool: True if error was handled
        """

    def should_retry(
        self,
        error: Exception,
        attempt: int,
        max_retries: int = 10
    ) -> bool:
        """
        Determine if operation should be retried.
        
        Args:
            error (Exception): Error that occurred
            attempt (int): Current attempt number
            max_retries (int): Maximum retry attempts
            
        Returns:
            bool: True if should retry
        """

    def log_error(
        self,
        error: Exception,
        level: int = logging.ERROR,
        extra_context: dict = None
    ):
        """
        Log error with context information.
        
        Args:
            error (Exception): Error to log
            level (int): Log level
            extra_context (dict, optional): Additional context
        """

Usage Examples

Basic Error Handling

import ydb
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def handle_ydb_operations():
    driver = ydb.Driver(endpoint="grpc://localhost:2136", database="/local")
    
    try:
        driver.wait(fail_fast=True, timeout=5)
        session_pool = ydb.SessionPool(driver)
        
        def execute_query(session):
            return session.execute_query("SELECT COUNT(*) FROM users")
        
        # Execute with automatic retry
        result = session_pool.retry_operation_sync(execute_query)
        
    except ydb.ConnectionError as e:
        logger.error(f"Connection failed: {e}")
        # Handle connection issues - maybe use backup endpoint
        
    except ydb.UnauthorizedError as e:
        logger.error(f"Authentication failed: {e}")
        # Handle auth issues - refresh credentials
        
    except ydb.NotFoundError as e:
        logger.error(f"Resource not found: {e}")
        # Handle missing resources - create or use default
        
    except ydb.RetryableError as e:
        logger.warning(f"Retriable error occurred: {e}")
        # These are handled automatically by retry_operation_sync
        
    except ydb.Error as e:
        logger.error(f"YDB error: {e}")
        logger.error(f"Status code: {e.status}")
        for issue in e.issues:
            logger.error(f"Issue: {issue.message}")
            
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        
    finally:
        if 'session_pool' in locals():
            session_pool.stop()
        if 'driver' in locals():
            driver.stop()

Custom Retry Configuration

# Configure custom retry behavior
def configure_custom_retries():
    # Fast backoff for quick operations
    fast_backoff = ydb.BackoffSettings(
        slot_duration=0.001,  # 1ms base
        ceiling=8,            # Up to 256ms
        max_backoff=0.5,      # Max 500ms
        jitter_limit=0.1      # 10% jitter
    )
    
    # Slow backoff for heavy operations
    slow_backoff = ydb.BackoffSettings(
        slot_duration=2.0,    # 2s base
        ceiling=4,            # Up to 32s
        max_backoff=60.0,     # Max 1 minute
        jitter_limit=0.2      # 20% jitter
    )
    
    # Custom retry settings
    retry_settings = ydb.RetrySettings(
        max_retries=5,
        fast_backoff_settings=fast_backoff,
        slow_backoff_settings=slow_backoff,
        retry_not_found=False,  # Don't retry NOT_FOUND
        retry_internal_error=True,  # Retry internal errors
        on_ydb_error_callback=lambda error: logger.warning(f"Retrying after: {error}")
    )
    
    return retry_settings

# Use custom retry settings
custom_retry_settings = configure_custom_retries()

def robust_operation(session):
    # This operation will use custom retry behavior
    return session.execute_query(
        "SELECT * FROM large_table WHERE complex_condition = true"
    )

result = session_pool.retry_operation_sync(
    robust_operation,
    retry_settings=custom_retry_settings
)

Error Classification and Handling

def classify_and_handle_error(error: Exception) -> bool:
    """
    Classify error and determine handling strategy.
    
    Returns:
        bool: True if operation should continue, False if should abort
    """
    
    if isinstance(error, ydb.BadRequestError):
        logger.error(f"Bad request - fix query: {error}")
        return False  # Don't continue with bad requests
        
    elif isinstance(error, ydb.UnauthorizedError):
        logger.error(f"Auth failed - refresh credentials: {error}")
        # Could refresh credentials here
        return False
        
    elif isinstance(error, ydb.NotFoundError):
        logger.warning(f"Resource not found: {error}")
        # Might create missing resource
        return True
        
    elif isinstance(error, ydb.OverloadedError):
        logger.warning(f"Service overloaded: {error}")
        # Implement backoff strategy
        import time
        time.sleep(5.0)  # Wait before retrying
        return True
        
    elif isinstance(error, ydb.SessionExpiredError):
        logger.info(f"Session expired - will get new session: {error}")
        return True  # Session pool will handle
        
    elif isinstance(error, ydb.ConnectionError):
        logger.warning(f"Connection issue: {error}")
        # Could try alternative endpoint
        return True
        
    elif isinstance(error, ydb.RetryableError):
        logger.info(f"Retriable error: {error}")
        return True
        
    else:
        logger.error(f"Non-retriable error: {error}")
        return False

# Example usage with manual retry logic
def manual_retry_operation(operation_func, max_attempts=3):
    for attempt in range(max_attempts):
        try:
            return operation_func()
            
        except Exception as e:
            should_continue = classify_and_handle_error(e)
            
            if not should_continue or attempt == max_attempts - 1:
                raise  # Re-raise if shouldn't continue or final attempt
                
            # Calculate backoff
            backoff_time = min(2 ** attempt, 10)  # Exponential backoff, max 10s
            logger.info(f"Retrying in {backoff_time}s (attempt {attempt + 1}/{max_attempts})")
            time.sleep(backoff_time)

Async Error Handling

import asyncio
import ydb.aio as ydb_aio

async def async_error_handling():
    """Demonstrate async error handling patterns."""
    
    async with ydb_aio.Driver(
        endpoint="grpc://localhost:2136",
        database="/local"
    ) as driver:
        
        try:
            await driver.wait(fail_fast=True, timeout=5)
            
            async with ydb_aio.SessionPool(driver) as pool:
                
                async def async_operation(session):
                    return await session.execute_query(
                        "SELECT * FROM users WHERE active = true"
                    )
                
                # Use async retry
                result = await pool.retry_operation(async_operation)
                
        except ydb.ConnectionError as e:
            logger.error(f"Async connection failed: {e}")
            # Handle async connection issues
            
        except ydb.TimeoutError as e:
            logger.error(f"Async operation timed out: {e}")
            # Handle timeout with alternative strategy
            
        except asyncio.CancelledError:
            logger.info("Operation cancelled")
            raise  # Re-raise cancellation
            
        except Exception as e:
            logger.error(f"Unexpected async error: {e}")

# Run async error handling
asyncio.run(async_error_handling())

Error Recovery Strategies

class ErrorRecoveryManager:
    """Manages error recovery strategies for YDB operations."""
    
    def __init__(self, driver: ydb.Driver):
        self.driver = driver
        self.session_pool = ydb.SessionPool(driver)
        self.fallback_data = {}
        self.circuit_breaker_failures = 0
        self.circuit_breaker_threshold = 5
        self.circuit_breaker_reset_time = 60
        self.last_failure_time = 0
        
    def execute_with_recovery(self, operation_func, fallback_func=None):
        """Execute operation with comprehensive recovery strategy."""
        
        # Check circuit breaker
        if self._is_circuit_breaker_open():
            logger.warning("Circuit breaker open, using fallback")
            return self._execute_fallback(fallback_func)
        
        try:
            # Attempt primary operation
            result = self.session_pool.retry_operation_sync(
                operation_func,
                retry_settings=ydb.RetrySettings(max_retries=3)
            )
            
            # Success - reset circuit breaker
            self.circuit_breaker_failures = 0
            return result
            
        except ydb.OverloadedError:
            # Specific handling for overload
            logger.warning("Service overloaded, implementing backoff")
            self._handle_overload()
            return self._execute_fallback(fallback_func)
            
        except ydb.SessionPoolEmptyError:
            # Handle session pool exhaustion
            logger.warning("Session pool exhausted, creating new pool")
            self._recreate_session_pool()
            # Retry once with new pool
            try:
                return self.session_pool.retry_operation_sync(operation_func)
            except Exception:
                return self._execute_fallback(fallback_func)
                
        except ydb.ConnectionError:
            # Handle connection issues
            logger.error("Connection failed, trying endpoint discovery")
            self._handle_connection_failure()
            return self._execute_fallback(fallback_func)
            
        except ydb.RetryableError as e:
            # Track retriable failures for circuit breaker
            self.circuit_breaker_failures += 1
            self.last_failure_time = time.time()
            logger.error(f"Retriable error after retries: {e}")
            return self._execute_fallback(fallback_func)
            
        except Exception as e:
            logger.error(f"Unhandled error: {e}")
            return self._execute_fallback(fallback_func)
    
    def _is_circuit_breaker_open(self) -> bool:
        """Check if circuit breaker should be open."""
        if self.circuit_breaker_failures < self.circuit_breaker_threshold:
            return False
            
        # Check if enough time has passed to reset
        if time.time() - self.last_failure_time > self.circuit_breaker_reset_time:
            self.circuit_breaker_failures = 0
            return False
            
        return True
    
    def _execute_fallback(self, fallback_func):
        """Execute fallback strategy."""
        if fallback_func:
            try:
                return fallback_func()
            except Exception as e:
                logger.error(f"Fallback also failed: {e}")
        
        # Return cached/default data
        logger.info("Using cached fallback data")
        return self.fallback_data.get("default", [])
    
    def _handle_overload(self):
        """Handle service overload with exponential backoff."""
        backoff_time = min(2 ** self.circuit_breaker_failures, 30)
        logger.info(f"Backing off for {backoff_time}s due to overload")
        time.sleep(backoff_time)
    
    def _recreate_session_pool(self):
        """Recreate session pool if exhausted."""
        try:
            self.session_pool.stop()
            self.session_pool = ydb.SessionPool(self.driver, size=20)
        except Exception as e:
            logger.error(f"Failed to recreate session pool: {e}")
    
    def _handle_connection_failure(self):
        """Handle connection failures with endpoint rotation."""
        # Could implement endpoint discovery refresh here
        logger.info("Connection failure - would refresh endpoints")

# Usage example
recovery_manager = ErrorRecoveryManager(driver)

def get_user_data(session):
    return session.execute_query("SELECT * FROM users LIMIT 100")

def fallback_user_data():
    # Return cached or default user data
    return [{"id": 1, "name": "Default User"}]

# Execute with comprehensive error recovery
user_data = recovery_manager.execute_with_recovery(
    get_user_data,
    fallback_user_data
)

Error Monitoring and Metrics

from collections import defaultdict
import time

class ErrorMetricsCollector:
    """Collect and report error metrics for monitoring."""
    
    def __init__(self):
        self.error_counts = defaultdict(int)
        self.error_rates = defaultdict(list)
        self.retry_counts = defaultdict(int)
        self.start_time = time.time()
    
    def record_error(self, error: Exception, operation: str = "unknown"):
        """Record error occurrence for metrics."""
        error_type = type(error).__name__
        current_time = time.time()
        
        # Count by error type
        self.error_counts[error_type] += 1
        
        # Track error rates (errors per minute)
        self.error_rates[error_type].append(current_time)
        
        # Clean old entries (keep last hour)
        cutoff_time = current_time - 3600
        self.error_rates[error_type] = [
            t for t in self.error_rates[error_type] if t > cutoff_time
        ]
        
        # Log structured error info
        logger.info(
            "error_occurred",
            extra={
                "error_type": error_type,
                "operation": operation,
                "error_message": str(error),
                "status_code": getattr(error, 'status', None)
            }
        )
    
    def record_retry(self, error_type: str, attempt: int):
        """Record retry attempt."""
        self.retry_counts[f"{error_type}_retry_{attempt}"] += 1
    
    def get_error_summary(self) -> dict:
        """Get current error summary for monitoring."""
        current_time = time.time()
        uptime = current_time - self.start_time
        
        summary = {
            "uptime_seconds": uptime,
            "total_errors": sum(self.error_counts.values()),
            "error_counts": dict(self.error_counts),
            "retry_counts": dict(self.retry_counts),
            "error_rates_per_minute": {}
        }
        
        # Calculate error rates per minute
        for error_type, timestamps in self.error_rates.items():
            recent_errors = len([t for t in timestamps if current_time - t < 60])
            summary["error_rates_per_minute"][error_type] = recent_errors
        
        return summary
    
    def should_alert(self) -> bool:
        """Check if error rates warrant alerting."""
        current_time = time.time()
        
        # Alert if more than 10 errors per minute for any type
        for error_type, timestamps in self.error_rates.items():
            recent_errors = len([t for t in timestamps if current_time - t < 60])
            if recent_errors > 10:
                logger.warning(f"High error rate: {recent_errors}/min for {error_type}")
                return True
        
        return False

# Global metrics collector
metrics = ErrorMetricsCollector()

# Enhanced retry operation with metrics
def retry_with_metrics(operation_func, operation_name="unknown"):
    """Execute operation with error metrics collection."""
    
    for attempt in range(3):
        try:
            return operation_func()
            
        except Exception as e:
            # Record error metrics
            metrics.record_error(e, operation_name)
            metrics.record_retry(type(e).__name__, attempt + 1)
            
            if attempt == 2:  # Final attempt
                raise
            
            # Check if we should alert on error rates
            if metrics.should_alert():
                logger.critical("High error rates detected - consider investigation")

# Usage with metrics
def monitored_database_operation():
    def db_query(session):
        return session.execute_query("SELECT COUNT(*) FROM orders")
    
    return retry_with_metrics(
        lambda: session_pool.retry_operation_sync(db_query),
        "order_count_query"
    )

# Periodic metrics reporting
def report_metrics():
    summary = metrics.get_error_summary()
    logger.info("error_summary", extra=summary)
    
    # Could send to monitoring system here
    # send_to_monitoring(summary)

# Schedule periodic reporting
import threading
def periodic_reporting():
    while True:
        time.sleep(300)  # Report every 5 minutes
        report_metrics()

reporting_thread = threading.Thread(target=periodic_reporting, daemon=True)
reporting_thread.start()

Type Definitions

# Type aliases for error handling
ErrorHandler = Callable[[Exception], bool]
ErrorCallback = Callable[[Exception], None]
RetryDecision = bool
BackoffTime = float

# Error classification
ErrorClassifier = Callable[[Exception], ErrorCategory]
RetryStrategy = Callable[[Exception, int], RetryDecision]
BackoffCalculator = Callable[[Exception, int], BackoffTime]

# Monitoring types
ErrorMetric = Dict[str, Union[int, float, str]]
MetricsReporter = Callable[[ErrorMetric], None]
AlertTrigger = Callable[[ErrorMetric], bool]

Install with Tessl CLI

npx tessl i tessl/pypi-ydb

docs

async-operations.md

authentication.md

data-types.md

dbapi-interface.md

driver-connection.md

error-handling.md

index.md

query-service.md

schema-operations.md

sqlalchemy-integration.md

table-operations.md

topic-operations.md

tile.json