Officially supported Python client for YDB distributed SQL database
Comprehensive error handling with detailed error hierarchies, retry strategies, backoff configurations, and operation result processing.
YDB Python SDK provides a comprehensive error hierarchy for handling different types of failures.
class Error(Exception):
"""Base class for all YDB errors."""
def __init__(
self,
message: str,
issues: Optional[Iterable[IssueMessage]] = None
):
"""
Base YDB error.
Args:
message (str): Error message
issues (Optional[Iterable[IssueMessage]]): Detailed error issues
"""
super().__init__(message)
self.message = message
self.issues = issues or []
@property
def status(self) -> Optional[int]:
"""Error status code."""
@property
def issues(self) -> List[IssueMessage]:
"""Detailed error issues."""
class RetryableError(Error):
"""Base class for errors that can be retried."""
class BadRequestError(Error):
"""Request validation or syntax errors."""
status = StatusCode.BAD_REQUEST
class UnauthorizedError(Error):
"""Authentication errors."""
status = StatusCode.UNAUTHORIZED
class ForbiddenError(Error):
"""Authorization/permission errors."""
status = StatusCode.FORBIDDEN
class NotFoundError(Error):
"""Resource not found errors."""
status = StatusCode.NOT_FOUND
class AlreadyExistsError(Error):
"""Resource already exists errors."""
status = StatusCode.ALREADY_EXISTS
class PreconditionFailedError(RetryableError):
"""Precondition check failures."""
status = StatusCode.PRECONDITION_FAILED
class AbortedError(RetryableError):
"""Aborted operations due to conflicts."""
status = StatusCode.ABORTED
class UnavailableError(RetryableError):
"""Service temporarily unavailable."""
status = StatusCode.UNAVAILABLE
class OverloadedError(RetryableError):
"""Service overloaded, backoff required."""
status = StatusCode.OVERLOADED
class TimeoutError(RetryableError):
"""Operation timeout errors."""
status = StatusCode.TIMEOUT
class CancelledError(Error):
"""Cancelled operations."""
status = StatusCode.CANCELLED
class UndeterminedError(RetryableError):
"""Operations with undetermined outcome."""
status = StatusCode.UNDETERMINED
class InternalError(RetryableError):
"""Internal service errors."""
status = StatusCode.INTERNAL_ERROR
class UnsupportedError(Error):
"""Unsupported operations."""
status = StatusCode.UNSUPPORTED
class SchemeError(Error):
"""Schema-related errors."""
status = StatusCode.SCHEME_ERROR
class GenericError(Error):
"""Generic/unclassified errors."""
status = StatusCode.GENERIC_ERRORErrors related to session lifecycle and management.
class BadSessionError(RetryableError):
"""Invalid or corrupted session state."""
status = StatusCode.BAD_SESSION
class SessionExpiredError(RetryableError):
"""Session has expired and needs renewal."""
status = StatusCode.SESSION_EXPIRED
class SessionBusyError(RetryableError):
"""Session is busy with another operation."""
status = StatusCode.SESSION_BUSY
class SessionPoolEmptyError(RetryableError):
"""No available sessions in the pool."""
status = StatusCode.SESSION_POOL_EMPTY
class SessionPoolClosedError(Error):
"""Session pool has been closed."""
status = StatusCode.SESSION_POOL_CLOSED
class QueryCacheEmptyError(RetryableError):
"""Query not found in prepared query cache."""Network and transport-level error conditions.
class ConnectionError(RetryableError):
"""Base class for connection-related errors."""
class ConnectionLostError(ConnectionError):
"""Connection lost during operation."""
status = StatusCode.CONNECTION_LOST
class ConnectionFailureError(ConnectionError):
"""Failed to establish connection."""
status = StatusCode.CONNECTION_FAILURE
class DeadlineExceededError(ConnectionError):
"""Operation exceeded deadline."""
status = StatusCode.DEADLINE_EXCEEDED
class ClientInternalError(Error):
"""Client-side internal errors."""
status = StatusCode.CLIENT_INTERNAL_ERROR
class ClientResourceExhaustedError(Error):
"""Client resources exhausted."""
class ClientDiscoveryError(ConnectionError):
"""Endpoint discovery failures."""
class UnauthenticatedError(Error):
"""Authentication credential errors."""
status = StatusCode.UNAUTHENTICATED
class UnimplementedError(Error):
"""Unimplemented features or operations."""
status = StatusCode.UNIMPLEMENTEDEnumeration of all possible YDB status codes.
class StatusCode(enum.IntEnum):
"""YDB operation status codes."""
# Success
SUCCESS = 0
# Client errors (4xx equivalent)
BAD_REQUEST = 400
UNAUTHORIZED = 401
FORBIDDEN = 403
NOT_FOUND = 404
ALREADY_EXISTS = 409
PRECONDITION_FAILED = 412
UNSUPPORTED = 501
# Server errors (5xx equivalent)
INTERNAL_ERROR = 500
UNAVAILABLE = 503
TIMEOUT = 504
OVERLOADED = 503
# YDB-specific
ABORTED = 10
CANCELLED = 1
UNDETERMINED = 2
SCHEME_ERROR = 20
GENERIC_ERROR = 21
BAD_SESSION = 30
SESSION_EXPIRED = 31
SESSION_BUSY = 32
# Transport errors
CONNECTION_LOST = 401010
CONNECTION_FAILURE = 401020
DEADLINE_EXCEEDED = 401030
CLIENT_INTERNAL_ERROR = 401040
UNIMPLEMENTED = 401050
# Client pool errors
UNAUTHENTICATED = 402030
SESSION_POOL_EMPTY = 402040
SESSION_POOL_CLOSED = 402050
def is_retryable_error(error: Exception) -> bool:
"""
Check if error is retryable.
Args:
error (Exception): Error to check
Returns:
bool: True if error can be retried
"""
def get_error_class(status_code: int) -> Type[Error]:
"""
Get error class for status code.
Args:
status_code (int): YDB status code
Returns:
Type[Error]: Appropriate error class
"""Detailed error information with nested issue structures.
class IssueMessage:
def __init__(
self,
message: str,
issue_code: int = None,
severity: int = None,
issues: List['IssueMessage'] = None
):
"""
Detailed error issue information.
Args:
message (str): Issue description
issue_code (int, optional): Issue-specific code
severity (int, optional): Issue severity level
issues (List[IssueMessage], optional): Nested sub-issues
"""
@property
def message(self) -> str:
"""Issue description message."""
@property
def issue_code(self) -> Optional[int]:
"""Issue-specific code."""
@property
def severity(self) -> Optional[int]:
"""Issue severity level."""
@property
def issues(self) -> List['IssueMessage']:
"""Nested sub-issues."""
def to_dict(self) -> dict:
"""
Convert issue to dictionary representation.
Returns:
dict: Issue as dictionary
"""
def __str__(self) -> str:
"""String representation of the issue."""
class IssueSeverity(enum.IntEnum):
"""Issue severity levels."""
INFO = 1
NOTICE = 2
WARNING = 3
ERROR = 4
FATAL = 5Configurable retry strategies with exponential backoff and jitter.
class RetrySettings:
def __init__(
self,
max_retries: int = 10,
max_session_acquire_timeout: float = None,
fast_backoff_settings: BackoffSettings = None,
slow_backoff_settings: BackoffSettings = None,
retry_not_found: bool = False,
retry_internal_error: bool = True,
unknown_error_handler: Callable[[Exception], bool] = None,
on_ydb_error_callback: Callable[[YdbError], None] = None
):
"""
Retry configuration for YDB operations.
Args:
max_retries (int): Maximum number of retry attempts
max_session_acquire_timeout (float, optional): Session acquisition timeout
fast_backoff_settings (BackoffSettings, optional): Fast retry backoff
slow_backoff_settings (BackoffSettings, optional): Slow retry backoff
retry_not_found (bool): Whether to retry NotFound errors
retry_internal_error (bool): Whether to retry internal errors
unknown_error_handler (Callable, optional): Handler for unknown errors
on_ydb_error_callback (Callable, optional): Error callback function
"""
@property
def max_retries(self) -> int:
"""Maximum retry attempts."""
@property
def fast_backoff_settings(self) -> BackoffSettings:
"""Fast backoff configuration."""
@property
def slow_backoff_settings(self) -> BackoffSettings:
"""Slow backoff configuration."""
def with_max_retries(self, max_retries: int) -> 'RetrySettings':
"""
Create copy with different max retries.
Args:
max_retries (int): New max retry count
Returns:
RetrySettings: New retry settings instance
"""
def with_fast_backoff(self, backoff: BackoffSettings) -> 'RetrySettings':
"""
Create copy with different fast backoff.
Args:
backoff (BackoffSettings): New fast backoff settings
Returns:
RetrySettings: New retry settings instance
"""
class BackoffSettings:
def __init__(
self,
slot_duration: float = 1.0,
ceiling: int = 6,
max_backoff: float = 32.0,
jitter_limit: float = 1.0,
uncertain_ratio: float = 0.1
):
"""
Exponential backoff configuration.
Args:
slot_duration (float): Base slot duration in seconds
ceiling (int): Backoff ceiling (2^ceiling * slot_duration max)
max_backoff (float): Maximum backoff time in seconds
jitter_limit (float): Maximum jitter multiplier
uncertain_ratio (float): Ratio for uncertain error handling
"""
@property
def slot_duration(self) -> float:
"""Base slot duration."""
@property
def ceiling(self) -> int:
"""Backoff ceiling exponent."""
@property
def max_backoff(self) -> float:
"""Maximum backoff time."""
def calculate_backoff(self, attempt: int) -> float:
"""
Calculate backoff time for attempt.
Args:
attempt (int): Retry attempt number (0-based)
Returns:
float: Backoff time in seconds
"""
def with_jitter(self, backoff: float) -> float:
"""
Apply jitter to backoff time.
Args:
backoff (float): Base backoff time
Returns:
float: Jittered backoff time
"""
# Predefined backoff settings
DEFAULT_FAST_BACKOFF = BackoffSettings(slot_duration=0.005, ceiling=10, max_backoff=0.2)
DEFAULT_SLOW_BACKOFF = BackoffSettings(slot_duration=1.0, ceiling=6, max_backoff=32.0)High-level retry functionality for database operations.
def retry_operation_sync(
callee: Callable[..., Any],
retry_settings: RetrySettings = None,
session_pool: SessionPool = None,
*args,
**kwargs
) -> Any:
"""
Execute operation with retry logic.
Args:
callee (Callable): Function to execute
retry_settings (RetrySettings, optional): Retry configuration
session_pool (SessionPool, optional): Session pool for session-based operations
*args: Arguments for callee
**kwargs: Keyword arguments for callee
Returns:
Any: Result of successful callee execution
Raises:
Error: Final error if all retries exhausted
"""
async def retry_operation(
callee: Callable[..., Awaitable[Any]],
retry_settings: RetrySettings = None,
session_pool: SessionPool = None,
*args,
**kwargs
) -> Any:
"""
Execute async operation with retry logic.
Args:
callee (Callable): Async function to execute
retry_settings (RetrySettings, optional): Retry configuration
session_pool (SessionPool, optional): Session pool for session-based operations
*args: Arguments for callee
**kwargs: Keyword arguments for callee
Returns:
Any: Result of successful callee execution
Raises:
Error: Final error if all retries exhausted
"""
class YdbRetryOperationSleepOpt:
def __init__(
self,
timeout: float = None,
backoff_settings: BackoffSettings = None
):
"""
Sleep options for retry operations.
Args:
timeout (float, optional): Maximum sleep time
backoff_settings (BackoffSettings, optional): Backoff configuration
"""
class YdbRetryOperationFinalResult:
def __init__(
self,
result: Any = None,
error: Exception = None,
attempts: int = 0
):
"""
Final result of retry operation.
Args:
result (Any, optional): Operation result if successful
error (Exception, optional): Final error if failed
attempts (int): Number of attempts made
"""
@property
def is_success(self) -> bool:
"""True if operation succeeded."""
@property
def is_failure(self) -> bool:
"""True if operation failed."""
def retry_operation_impl(
callee: Callable,
retry_settings: RetrySettings = None,
*args,
**kwargs
) -> YdbRetryOperationFinalResult:
"""
Low-level retry implementation.
Args:
callee (Callable): Function to execute
retry_settings (RetrySettings, optional): Retry configuration
*args: Arguments for callee
**kwargs: Keyword arguments for callee
Returns:
YdbRetryOperationFinalResult: Operation result with metadata
"""Utilities for categorizing and handling different error types.
def classify_error(error: Exception) -> ErrorCategory:
"""
Classify error into category for handling strategy.
Args:
error (Exception): Error to classify
Returns:
ErrorCategory: Error category
"""
class ErrorCategory(enum.Enum):
"""Error classification categories."""
RETRIABLE_FAST = "retriable_fast" # Quick retry with fast backoff
RETRIABLE_SLOW = "retriable_slow" # Retry with slow backoff
RETRIABLE_UNCERTAIN = "retriable_uncertain" # Uncertain outcome, careful retry
NON_RETRIABLE = "non_retriable" # Don't retry these errors
FATAL = "fatal" # Fatal errors, stop immediately
def is_transport_error(error: Exception) -> bool:
"""
Check if error is transport/network related.
Args:
error (Exception): Error to check
Returns:
bool: True if transport error
"""
def is_server_error(error: Exception) -> bool:
"""
Check if error is server-side.
Args:
error (Exception): Error to check
Returns:
bool: True if server error
"""
def is_client_error(error: Exception) -> bool:
"""
Check if error is client-side.
Args:
error (Exception): Error to check
Returns:
bool: True if client error
"""
def should_retry_error(
error: Exception,
retry_settings: RetrySettings = None
) -> bool:
"""
Determine if error should be retried based on settings.
Args:
error (Exception): Error to evaluate
retry_settings (RetrySettings, optional): Retry configuration
Returns:
bool: True if error should be retried
"""
def get_retry_backoff(
error: Exception,
attempt: int,
retry_settings: RetrySettings = None
) -> float:
"""
Calculate appropriate backoff time for error and attempt.
Args:
error (Exception): Error that occurred
attempt (int): Retry attempt number
retry_settings (RetrySettings, optional): Retry configuration
Returns:
float: Backoff time in seconds
"""Context management for error handling and debugging.
class ErrorContext:
def __init__(
self,
operation: str = None,
request_id: str = None,
session_id: str = None,
endpoint: str = None,
database: str = None
):
"""
Context information for error analysis.
Args:
operation (str, optional): Operation being performed
request_id (str, optional): Request identifier
session_id (str, optional): Session identifier
endpoint (str, optional): YDB endpoint
database (str, optional): Database path
"""
@property
def operation(self) -> Optional[str]:
"""Operation being performed."""
@property
def request_id(self) -> Optional[str]:
"""Request identifier."""
@property
def session_id(self) -> Optional[str]:
"""Session identifier."""
def to_dict(self) -> dict:
"""Convert context to dictionary."""
def __str__(self) -> str:
"""String representation of context."""
class ErrorHandler:
def __init__(
self,
logger: logging.Logger = None,
context: ErrorContext = None
):
"""
Error handling utilities.
Args:
logger (logging.Logger, optional): Logger for error reporting
context (ErrorContext, optional): Error context information
"""
def handle_error(
self,
error: Exception,
operation: str = None
) -> bool:
"""
Handle error with appropriate logging and classification.
Args:
error (Exception): Error to handle
operation (str, optional): Operation context
Returns:
bool: True if error was handled
"""
def should_retry(
self,
error: Exception,
attempt: int,
max_retries: int = 10
) -> bool:
"""
Determine if operation should be retried.
Args:
error (Exception): Error that occurred
attempt (int): Current attempt number
max_retries (int): Maximum retry attempts
Returns:
bool: True if should retry
"""
def log_error(
self,
error: Exception,
level: int = logging.ERROR,
extra_context: dict = None
):
"""
Log error with context information.
Args:
error (Exception): Error to log
level (int): Log level
extra_context (dict, optional): Additional context
"""import ydb
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def handle_ydb_operations():
driver = ydb.Driver(endpoint="grpc://localhost:2136", database="/local")
try:
driver.wait(fail_fast=True, timeout=5)
session_pool = ydb.SessionPool(driver)
def execute_query(session):
return session.execute_query("SELECT COUNT(*) FROM users")
# Execute with automatic retry
result = session_pool.retry_operation_sync(execute_query)
except ydb.ConnectionError as e:
logger.error(f"Connection failed: {e}")
# Handle connection issues - maybe use backup endpoint
except ydb.UnauthorizedError as e:
logger.error(f"Authentication failed: {e}")
# Handle auth issues - refresh credentials
except ydb.NotFoundError as e:
logger.error(f"Resource not found: {e}")
# Handle missing resources - create or use default
except ydb.RetryableError as e:
logger.warning(f"Retriable error occurred: {e}")
# These are handled automatically by retry_operation_sync
except ydb.Error as e:
logger.error(f"YDB error: {e}")
logger.error(f"Status code: {e.status}")
for issue in e.issues:
logger.error(f"Issue: {issue.message}")
except Exception as e:
logger.error(f"Unexpected error: {e}")
finally:
if 'session_pool' in locals():
session_pool.stop()
if 'driver' in locals():
driver.stop()# Configure custom retry behavior
def configure_custom_retries():
# Fast backoff for quick operations
fast_backoff = ydb.BackoffSettings(
slot_duration=0.001, # 1ms base
ceiling=8, # Up to 256ms
max_backoff=0.5, # Max 500ms
jitter_limit=0.1 # 10% jitter
)
# Slow backoff for heavy operations
slow_backoff = ydb.BackoffSettings(
slot_duration=2.0, # 2s base
ceiling=4, # Up to 32s
max_backoff=60.0, # Max 1 minute
jitter_limit=0.2 # 20% jitter
)
# Custom retry settings
retry_settings = ydb.RetrySettings(
max_retries=5,
fast_backoff_settings=fast_backoff,
slow_backoff_settings=slow_backoff,
retry_not_found=False, # Don't retry NOT_FOUND
retry_internal_error=True, # Retry internal errors
on_ydb_error_callback=lambda error: logger.warning(f"Retrying after: {error}")
)
return retry_settings
# Use custom retry settings
custom_retry_settings = configure_custom_retries()
def robust_operation(session):
# This operation will use custom retry behavior
return session.execute_query(
"SELECT * FROM large_table WHERE complex_condition = true"
)
result = session_pool.retry_operation_sync(
robust_operation,
retry_settings=custom_retry_settings
)def classify_and_handle_error(error: Exception) -> bool:
"""
Classify error and determine handling strategy.
Returns:
bool: True if operation should continue, False if should abort
"""
if isinstance(error, ydb.BadRequestError):
logger.error(f"Bad request - fix query: {error}")
return False # Don't continue with bad requests
elif isinstance(error, ydb.UnauthorizedError):
logger.error(f"Auth failed - refresh credentials: {error}")
# Could refresh credentials here
return False
elif isinstance(error, ydb.NotFoundError):
logger.warning(f"Resource not found: {error}")
# Might create missing resource
return True
elif isinstance(error, ydb.OverloadedError):
logger.warning(f"Service overloaded: {error}")
# Implement backoff strategy
import time
time.sleep(5.0) # Wait before retrying
return True
elif isinstance(error, ydb.SessionExpiredError):
logger.info(f"Session expired - will get new session: {error}")
return True # Session pool will handle
elif isinstance(error, ydb.ConnectionError):
logger.warning(f"Connection issue: {error}")
# Could try alternative endpoint
return True
elif isinstance(error, ydb.RetryableError):
logger.info(f"Retriable error: {error}")
return True
else:
logger.error(f"Non-retriable error: {error}")
return False
# Example usage with manual retry logic
def manual_retry_operation(operation_func, max_attempts=3):
for attempt in range(max_attempts):
try:
return operation_func()
except Exception as e:
should_continue = classify_and_handle_error(e)
if not should_continue or attempt == max_attempts - 1:
raise # Re-raise if shouldn't continue or final attempt
# Calculate backoff
backoff_time = min(2 ** attempt, 10) # Exponential backoff, max 10s
logger.info(f"Retrying in {backoff_time}s (attempt {attempt + 1}/{max_attempts})")
time.sleep(backoff_time)import asyncio
import ydb.aio as ydb_aio
async def async_error_handling():
"""Demonstrate async error handling patterns."""
async with ydb_aio.Driver(
endpoint="grpc://localhost:2136",
database="/local"
) as driver:
try:
await driver.wait(fail_fast=True, timeout=5)
async with ydb_aio.SessionPool(driver) as pool:
async def async_operation(session):
return await session.execute_query(
"SELECT * FROM users WHERE active = true"
)
# Use async retry
result = await pool.retry_operation(async_operation)
except ydb.ConnectionError as e:
logger.error(f"Async connection failed: {e}")
# Handle async connection issues
except ydb.TimeoutError as e:
logger.error(f"Async operation timed out: {e}")
# Handle timeout with alternative strategy
except asyncio.CancelledError:
logger.info("Operation cancelled")
raise # Re-raise cancellation
except Exception as e:
logger.error(f"Unexpected async error: {e}")
# Run async error handling
asyncio.run(async_error_handling())class ErrorRecoveryManager:
"""Manages error recovery strategies for YDB operations."""
def __init__(self, driver: ydb.Driver):
self.driver = driver
self.session_pool = ydb.SessionPool(driver)
self.fallback_data = {}
self.circuit_breaker_failures = 0
self.circuit_breaker_threshold = 5
self.circuit_breaker_reset_time = 60
self.last_failure_time = 0
def execute_with_recovery(self, operation_func, fallback_func=None):
"""Execute operation with comprehensive recovery strategy."""
# Check circuit breaker
if self._is_circuit_breaker_open():
logger.warning("Circuit breaker open, using fallback")
return self._execute_fallback(fallback_func)
try:
# Attempt primary operation
result = self.session_pool.retry_operation_sync(
operation_func,
retry_settings=ydb.RetrySettings(max_retries=3)
)
# Success - reset circuit breaker
self.circuit_breaker_failures = 0
return result
except ydb.OverloadedError:
# Specific handling for overload
logger.warning("Service overloaded, implementing backoff")
self._handle_overload()
return self._execute_fallback(fallback_func)
except ydb.SessionPoolEmptyError:
# Handle session pool exhaustion
logger.warning("Session pool exhausted, creating new pool")
self._recreate_session_pool()
# Retry once with new pool
try:
return self.session_pool.retry_operation_sync(operation_func)
except Exception:
return self._execute_fallback(fallback_func)
except ydb.ConnectionError:
# Handle connection issues
logger.error("Connection failed, trying endpoint discovery")
self._handle_connection_failure()
return self._execute_fallback(fallback_func)
except ydb.RetryableError as e:
# Track retriable failures for circuit breaker
self.circuit_breaker_failures += 1
self.last_failure_time = time.time()
logger.error(f"Retriable error after retries: {e}")
return self._execute_fallback(fallback_func)
except Exception as e:
logger.error(f"Unhandled error: {e}")
return self._execute_fallback(fallback_func)
def _is_circuit_breaker_open(self) -> bool:
"""Check if circuit breaker should be open."""
if self.circuit_breaker_failures < self.circuit_breaker_threshold:
return False
# Check if enough time has passed to reset
if time.time() - self.last_failure_time > self.circuit_breaker_reset_time:
self.circuit_breaker_failures = 0
return False
return True
def _execute_fallback(self, fallback_func):
"""Execute fallback strategy."""
if fallback_func:
try:
return fallback_func()
except Exception as e:
logger.error(f"Fallback also failed: {e}")
# Return cached/default data
logger.info("Using cached fallback data")
return self.fallback_data.get("default", [])
def _handle_overload(self):
"""Handle service overload with exponential backoff."""
backoff_time = min(2 ** self.circuit_breaker_failures, 30)
logger.info(f"Backing off for {backoff_time}s due to overload")
time.sleep(backoff_time)
def _recreate_session_pool(self):
"""Recreate session pool if exhausted."""
try:
self.session_pool.stop()
self.session_pool = ydb.SessionPool(self.driver, size=20)
except Exception as e:
logger.error(f"Failed to recreate session pool: {e}")
def _handle_connection_failure(self):
"""Handle connection failures with endpoint rotation."""
# Could implement endpoint discovery refresh here
logger.info("Connection failure - would refresh endpoints")
# Usage example
recovery_manager = ErrorRecoveryManager(driver)
def get_user_data(session):
return session.execute_query("SELECT * FROM users LIMIT 100")
def fallback_user_data():
# Return cached or default user data
return [{"id": 1, "name": "Default User"}]
# Execute with comprehensive error recovery
user_data = recovery_manager.execute_with_recovery(
get_user_data,
fallback_user_data
)from collections import defaultdict
import time
class ErrorMetricsCollector:
"""Collect and report error metrics for monitoring."""
def __init__(self):
self.error_counts = defaultdict(int)
self.error_rates = defaultdict(list)
self.retry_counts = defaultdict(int)
self.start_time = time.time()
def record_error(self, error: Exception, operation: str = "unknown"):
"""Record error occurrence for metrics."""
error_type = type(error).__name__
current_time = time.time()
# Count by error type
self.error_counts[error_type] += 1
# Track error rates (errors per minute)
self.error_rates[error_type].append(current_time)
# Clean old entries (keep last hour)
cutoff_time = current_time - 3600
self.error_rates[error_type] = [
t for t in self.error_rates[error_type] if t > cutoff_time
]
# Log structured error info
logger.info(
"error_occurred",
extra={
"error_type": error_type,
"operation": operation,
"error_message": str(error),
"status_code": getattr(error, 'status', None)
}
)
def record_retry(self, error_type: str, attempt: int):
"""Record retry attempt."""
self.retry_counts[f"{error_type}_retry_{attempt}"] += 1
def get_error_summary(self) -> dict:
"""Get current error summary for monitoring."""
current_time = time.time()
uptime = current_time - self.start_time
summary = {
"uptime_seconds": uptime,
"total_errors": sum(self.error_counts.values()),
"error_counts": dict(self.error_counts),
"retry_counts": dict(self.retry_counts),
"error_rates_per_minute": {}
}
# Calculate error rates per minute
for error_type, timestamps in self.error_rates.items():
recent_errors = len([t for t in timestamps if current_time - t < 60])
summary["error_rates_per_minute"][error_type] = recent_errors
return summary
def should_alert(self) -> bool:
"""Check if error rates warrant alerting."""
current_time = time.time()
# Alert if more than 10 errors per minute for any type
for error_type, timestamps in self.error_rates.items():
recent_errors = len([t for t in timestamps if current_time - t < 60])
if recent_errors > 10:
logger.warning(f"High error rate: {recent_errors}/min for {error_type}")
return True
return False
# Global metrics collector
metrics = ErrorMetricsCollector()
# Enhanced retry operation with metrics
def retry_with_metrics(operation_func, operation_name="unknown"):
"""Execute operation with error metrics collection."""
for attempt in range(3):
try:
return operation_func()
except Exception as e:
# Record error metrics
metrics.record_error(e, operation_name)
metrics.record_retry(type(e).__name__, attempt + 1)
if attempt == 2: # Final attempt
raise
# Check if we should alert on error rates
if metrics.should_alert():
logger.critical("High error rates detected - consider investigation")
# Usage with metrics
def monitored_database_operation():
def db_query(session):
return session.execute_query("SELECT COUNT(*) FROM orders")
return retry_with_metrics(
lambda: session_pool.retry_operation_sync(db_query),
"order_count_query"
)
# Periodic metrics reporting
def report_metrics():
summary = metrics.get_error_summary()
logger.info("error_summary", extra=summary)
# Could send to monitoring system here
# send_to_monitoring(summary)
# Schedule periodic reporting
import threading
def periodic_reporting():
while True:
time.sleep(300) # Report every 5 minutes
report_metrics()
reporting_thread = threading.Thread(target=periodic_reporting, daemon=True)
reporting_thread.start()# Type aliases for error handling
ErrorHandler = Callable[[Exception], bool]
ErrorCallback = Callable[[Exception], None]
RetryDecision = bool
BackoffTime = float
# Error classification
ErrorClassifier = Callable[[Exception], ErrorCategory]
RetryStrategy = Callable[[Exception, int], RetryDecision]
BackoffCalculator = Callable[[Exception, int], BackoffTime]
# Monitoring types
ErrorMetric = Dict[str, Union[int, float, str]]
MetricsReporter = Callable[[ErrorMetric], None]
AlertTrigger = Callable[[ErrorMetric], bool]