0
# Error Handling and Retries
1
2
Comprehensive error handling with detailed error hierarchies, retry strategies, backoff configurations, and operation result processing.
3
4
## Capabilities
5
6
### Error Hierarchy
7
8
YDB Python SDK provides a comprehensive error hierarchy for handling different types of failures.
9
10
```python { .api }
11
class Error(Exception):
12
"""Base class for all YDB errors."""
13
14
def __init__(
15
self,
16
message: str,
17
issues: Optional[Iterable[IssueMessage]] = None
18
):
19
"""
20
Base YDB error.
21
22
Args:
23
message (str): Error message
24
issues (Optional[Iterable[IssueMessage]]): Detailed error issues
25
"""
26
super().__init__(message)
27
self.message = message
28
self.issues = issues or []
29
30
@property
31
def status(self) -> Optional[int]:
32
"""Error status code."""
33
34
@property
35
def issues(self) -> List[IssueMessage]:
36
"""Detailed error issues."""
37
38
class RetryableError(Error):
39
"""Base class for errors that can be retried."""
40
41
class BadRequestError(Error):
42
"""Request validation or syntax errors."""
43
status = StatusCode.BAD_REQUEST
44
45
class UnauthorizedError(Error):
46
"""Authentication errors."""
47
status = StatusCode.UNAUTHORIZED
48
49
class ForbiddenError(Error):
50
"""Authorization/permission errors."""
51
status = StatusCode.FORBIDDEN
52
53
class NotFoundError(Error):
54
"""Resource not found errors."""
55
status = StatusCode.NOT_FOUND
56
57
class AlreadyExistsError(Error):
58
"""Resource already exists errors."""
59
status = StatusCode.ALREADY_EXISTS
60
61
class PreconditionFailedError(RetryableError):
62
"""Precondition check failures."""
63
status = StatusCode.PRECONDITION_FAILED
64
65
class AbortedError(RetryableError):
66
"""Aborted operations due to conflicts."""
67
status = StatusCode.ABORTED
68
69
class UnavailableError(RetryableError):
70
"""Service temporarily unavailable."""
71
status = StatusCode.UNAVAILABLE
72
73
class OverloadedError(RetryableError):
74
"""Service overloaded, backoff required."""
75
status = StatusCode.OVERLOADED
76
77
class TimeoutError(RetryableError):
78
"""Operation timeout errors."""
79
status = StatusCode.TIMEOUT
80
81
class CancelledError(Error):
82
"""Cancelled operations."""
83
status = StatusCode.CANCELLED
84
85
class UndeterminedError(RetryableError):
86
"""Operations with undetermined outcome."""
87
status = StatusCode.UNDETERMINED
88
89
class InternalError(RetryableError):
90
"""Internal service errors."""
91
status = StatusCode.INTERNAL_ERROR
92
93
class UnsupportedError(Error):
94
"""Unsupported operations."""
95
status = StatusCode.UNSUPPORTED
96
97
class SchemeError(Error):
98
"""Schema-related errors."""
99
status = StatusCode.SCHEME_ERROR
100
101
class GenericError(Error):
102
"""Generic/unclassified errors."""
103
status = StatusCode.GENERIC_ERROR
104
```
105
106
### Session-Specific Errors
107
108
Errors related to session lifecycle and management.
109
110
```python { .api }
111
class BadSessionError(RetryableError):
112
"""Invalid or corrupted session state."""
113
status = StatusCode.BAD_SESSION
114
115
class SessionExpiredError(RetryableError):
116
"""Session has expired and needs renewal."""
117
status = StatusCode.SESSION_EXPIRED
118
119
class SessionBusyError(RetryableError):
120
"""Session is busy with another operation."""
121
status = StatusCode.SESSION_BUSY
122
123
class SessionPoolEmptyError(RetryableError):
124
"""No available sessions in the pool."""
125
status = StatusCode.SESSION_POOL_EMPTY
126
127
class SessionPoolClosedError(Error):
128
"""Session pool has been closed."""
129
status = StatusCode.SESSION_POOL_CLOSED
130
131
class QueryCacheEmptyError(RetryableError):
132
"""Query not found in prepared query cache."""
133
```
134
135
### Connection Errors
136
137
Network and transport-level error conditions.
138
139
```python { .api }
140
class ConnectionError(RetryableError):
141
"""Base class for connection-related errors."""
142
143
class ConnectionLostError(ConnectionError):
144
"""Connection lost during operation."""
145
status = StatusCode.CONNECTION_LOST
146
147
class ConnectionFailureError(ConnectionError):
148
"""Failed to establish connection."""
149
status = StatusCode.CONNECTION_FAILURE
150
151
class DeadlineExceededError(ConnectionError):
152
"""Operation exceeded deadline."""
153
status = StatusCode.DEADLINE_EXCEEDED
154
155
class ClientInternalError(Error):
156
"""Client-side internal errors."""
157
status = StatusCode.CLIENT_INTERNAL_ERROR
158
159
class ClientResourceExhaustedError(Error):
160
"""Client resources exhausted."""
161
162
class ClientDiscoveryError(ConnectionError):
163
"""Endpoint discovery failures."""
164
165
class UnauthenticatedError(Error):
166
"""Authentication credential errors."""
167
status = StatusCode.UNAUTHENTICATED
168
169
class UnimplementedError(Error):
170
"""Unimplemented features or operations."""
171
status = StatusCode.UNIMPLEMENTED
172
```
173
174
### Status Codes
175
176
Enumeration of all possible YDB status codes.
177
178
```python { .api }
179
class StatusCode(enum.IntEnum):
180
"""YDB operation status codes."""
181
182
# Success
183
SUCCESS = 0
184
185
# Client errors (4xx equivalent)
186
BAD_REQUEST = 400
187
UNAUTHORIZED = 401
188
FORBIDDEN = 403
189
NOT_FOUND = 404
190
ALREADY_EXISTS = 409
191
PRECONDITION_FAILED = 412
192
UNSUPPORTED = 501
193
194
# Server errors (5xx equivalent)
195
INTERNAL_ERROR = 500
196
UNAVAILABLE = 503
197
TIMEOUT = 504
198
OVERLOADED = 503
199
200
# YDB-specific
201
ABORTED = 10
202
CANCELLED = 1
203
UNDETERMINED = 2
204
SCHEME_ERROR = 20
205
GENERIC_ERROR = 21
206
BAD_SESSION = 30
207
SESSION_EXPIRED = 31
208
SESSION_BUSY = 32
209
210
# Transport errors
211
CONNECTION_LOST = 401010
212
CONNECTION_FAILURE = 401020
213
DEADLINE_EXCEEDED = 401030
214
CLIENT_INTERNAL_ERROR = 401040
215
UNIMPLEMENTED = 401050
216
217
# Client pool errors
218
UNAUTHENTICATED = 402030
219
SESSION_POOL_EMPTY = 402040
220
SESSION_POOL_CLOSED = 402050
221
222
def is_retryable_error(error: Exception) -> bool:
223
"""
224
Check if error is retryable.
225
226
Args:
227
error (Exception): Error to check
228
229
Returns:
230
bool: True if error can be retried
231
"""
232
233
def get_error_class(status_code: int) -> Type[Error]:
234
"""
235
Get error class for status code.
236
237
Args:
238
status_code (int): YDB status code
239
240
Returns:
241
Type[Error]: Appropriate error class
242
"""
243
```
244
245
### Issue Messages
246
247
Detailed error information with nested issue structures.
248
249
```python { .api }
250
class IssueMessage:
251
def __init__(
252
self,
253
message: str,
254
issue_code: int = None,
255
severity: int = None,
256
issues: List['IssueMessage'] = None
257
):
258
"""
259
Detailed error issue information.
260
261
Args:
262
message (str): Issue description
263
issue_code (int, optional): Issue-specific code
264
severity (int, optional): Issue severity level
265
issues (List[IssueMessage], optional): Nested sub-issues
266
"""
267
268
@property
269
def message(self) -> str:
270
"""Issue description message."""
271
272
@property
273
def issue_code(self) -> Optional[int]:
274
"""Issue-specific code."""
275
276
@property
277
def severity(self) -> Optional[int]:
278
"""Issue severity level."""
279
280
@property
281
def issues(self) -> List['IssueMessage']:
282
"""Nested sub-issues."""
283
284
def to_dict(self) -> dict:
285
"""
286
Convert issue to dictionary representation.
287
288
Returns:
289
dict: Issue as dictionary
290
"""
291
292
def __str__(self) -> str:
293
"""String representation of the issue."""
294
295
class IssueSeverity(enum.IntEnum):
296
"""Issue severity levels."""
297
INFO = 1
298
NOTICE = 2
299
WARNING = 3
300
ERROR = 4
301
FATAL = 5
302
```
303
304
### Retry Configuration
305
306
Configurable retry strategies with exponential backoff and jitter.
307
308
```python { .api }
309
class RetrySettings:
310
def __init__(
311
self,
312
max_retries: int = 10,
313
max_session_acquire_timeout: float = None,
314
fast_backoff_settings: BackoffSettings = None,
315
slow_backoff_settings: BackoffSettings = None,
316
retry_not_found: bool = False,
317
retry_internal_error: bool = True,
318
unknown_error_handler: Callable[[Exception], bool] = None,
319
on_ydb_error_callback: Callable[[YdbError], None] = None
320
):
321
"""
322
Retry configuration for YDB operations.
323
324
Args:
325
max_retries (int): Maximum number of retry attempts
326
max_session_acquire_timeout (float, optional): Session acquisition timeout
327
fast_backoff_settings (BackoffSettings, optional): Fast retry backoff
328
slow_backoff_settings (BackoffSettings, optional): Slow retry backoff
329
retry_not_found (bool): Whether to retry NotFound errors
330
retry_internal_error (bool): Whether to retry internal errors
331
unknown_error_handler (Callable, optional): Handler for unknown errors
332
on_ydb_error_callback (Callable, optional): Error callback function
333
"""
334
335
@property
336
def max_retries(self) -> int:
337
"""Maximum retry attempts."""
338
339
@property
340
def fast_backoff_settings(self) -> BackoffSettings:
341
"""Fast backoff configuration."""
342
343
@property
344
def slow_backoff_settings(self) -> BackoffSettings:
345
"""Slow backoff configuration."""
346
347
def with_max_retries(self, max_retries: int) -> 'RetrySettings':
348
"""
349
Create copy with different max retries.
350
351
Args:
352
max_retries (int): New max retry count
353
354
Returns:
355
RetrySettings: New retry settings instance
356
"""
357
358
def with_fast_backoff(self, backoff: BackoffSettings) -> 'RetrySettings':
359
"""
360
Create copy with different fast backoff.
361
362
Args:
363
backoff (BackoffSettings): New fast backoff settings
364
365
Returns:
366
RetrySettings: New retry settings instance
367
"""
368
369
class BackoffSettings:
370
def __init__(
371
self,
372
slot_duration: float = 1.0,
373
ceiling: int = 6,
374
max_backoff: float = 32.0,
375
jitter_limit: float = 1.0,
376
uncertain_ratio: float = 0.1
377
):
378
"""
379
Exponential backoff configuration.
380
381
Args:
382
slot_duration (float): Base slot duration in seconds
383
ceiling (int): Backoff ceiling (2^ceiling * slot_duration max)
384
max_backoff (float): Maximum backoff time in seconds
385
jitter_limit (float): Maximum jitter multiplier
386
uncertain_ratio (float): Ratio for uncertain error handling
387
"""
388
389
@property
390
def slot_duration(self) -> float:
391
"""Base slot duration."""
392
393
@property
394
def ceiling(self) -> int:
395
"""Backoff ceiling exponent."""
396
397
@property
398
def max_backoff(self) -> float:
399
"""Maximum backoff time."""
400
401
def calculate_backoff(self, attempt: int) -> float:
402
"""
403
Calculate backoff time for attempt.
404
405
Args:
406
attempt (int): Retry attempt number (0-based)
407
408
Returns:
409
float: Backoff time in seconds
410
"""
411
412
def with_jitter(self, backoff: float) -> float:
413
"""
414
Apply jitter to backoff time.
415
416
Args:
417
backoff (float): Base backoff time
418
419
Returns:
420
float: Jittered backoff time
421
"""
422
423
# Predefined backoff settings
424
DEFAULT_FAST_BACKOFF = BackoffSettings(slot_duration=0.005, ceiling=10, max_backoff=0.2)
425
DEFAULT_SLOW_BACKOFF = BackoffSettings(slot_duration=1.0, ceiling=6, max_backoff=32.0)
426
```
427
428
### Retry Operations
429
430
High-level retry functionality for database operations.
431
432
```python { .api }
433
def retry_operation_sync(
434
callee: Callable[..., Any],
435
retry_settings: RetrySettings = None,
436
session_pool: SessionPool = None,
437
*args,
438
**kwargs
439
) -> Any:
440
"""
441
Execute operation with retry logic.
442
443
Args:
444
callee (Callable): Function to execute
445
retry_settings (RetrySettings, optional): Retry configuration
446
session_pool (SessionPool, optional): Session pool for session-based operations
447
*args: Arguments for callee
448
**kwargs: Keyword arguments for callee
449
450
Returns:
451
Any: Result of successful callee execution
452
453
Raises:
454
Error: Final error if all retries exhausted
455
"""
456
457
async def retry_operation(
458
callee: Callable[..., Awaitable[Any]],
459
retry_settings: RetrySettings = None,
460
session_pool: SessionPool = None,
461
*args,
462
**kwargs
463
) -> Any:
464
"""
465
Execute async operation with retry logic.
466
467
Args:
468
callee (Callable): Async function to execute
469
retry_settings (RetrySettings, optional): Retry configuration
470
session_pool (SessionPool, optional): Session pool for session-based operations
471
*args: Arguments for callee
472
**kwargs: Keyword arguments for callee
473
474
Returns:
475
Any: Result of successful callee execution
476
477
Raises:
478
Error: Final error if all retries exhausted
479
"""
480
481
class YdbRetryOperationSleepOpt:
482
def __init__(
483
self,
484
timeout: float = None,
485
backoff_settings: BackoffSettings = None
486
):
487
"""
488
Sleep options for retry operations.
489
490
Args:
491
timeout (float, optional): Maximum sleep time
492
backoff_settings (BackoffSettings, optional): Backoff configuration
493
"""
494
495
class YdbRetryOperationFinalResult:
496
def __init__(
497
self,
498
result: Any = None,
499
error: Exception = None,
500
attempts: int = 0
501
):
502
"""
503
Final result of retry operation.
504
505
Args:
506
result (Any, optional): Operation result if successful
507
error (Exception, optional): Final error if failed
508
attempts (int): Number of attempts made
509
"""
510
511
@property
512
def is_success(self) -> bool:
513
"""True if operation succeeded."""
514
515
@property
516
def is_failure(self) -> bool:
517
"""True if operation failed."""
518
519
def retry_operation_impl(
520
callee: Callable,
521
retry_settings: RetrySettings = None,
522
*args,
523
**kwargs
524
) -> YdbRetryOperationFinalResult:
525
"""
526
Low-level retry implementation.
527
528
Args:
529
callee (Callable): Function to execute
530
retry_settings (RetrySettings, optional): Retry configuration
531
*args: Arguments for callee
532
**kwargs: Keyword arguments for callee
533
534
Returns:
535
YdbRetryOperationFinalResult: Operation result with metadata
536
"""
537
```
538
539
### Error Classification
540
541
Utilities for categorizing and handling different error types.
542
543
```python { .api }
544
def classify_error(error: Exception) -> ErrorCategory:
545
"""
546
Classify error into category for handling strategy.
547
548
Args:
549
error (Exception): Error to classify
550
551
Returns:
552
ErrorCategory: Error category
553
"""
554
555
class ErrorCategory(enum.Enum):
556
"""Error classification categories."""
557
RETRIABLE_FAST = "retriable_fast" # Quick retry with fast backoff
558
RETRIABLE_SLOW = "retriable_slow" # Retry with slow backoff
559
RETRIABLE_UNCERTAIN = "retriable_uncertain" # Uncertain outcome, careful retry
560
NON_RETRIABLE = "non_retriable" # Don't retry these errors
561
FATAL = "fatal" # Fatal errors, stop immediately
562
563
def is_transport_error(error: Exception) -> bool:
564
"""
565
Check if error is transport/network related.
566
567
Args:
568
error (Exception): Error to check
569
570
Returns:
571
bool: True if transport error
572
"""
573
574
def is_server_error(error: Exception) -> bool:
575
"""
576
Check if error is server-side.
577
578
Args:
579
error (Exception): Error to check
580
581
Returns:
582
bool: True if server error
583
"""
584
585
def is_client_error(error: Exception) -> bool:
586
"""
587
Check if error is client-side.
588
589
Args:
590
error (Exception): Error to check
591
592
Returns:
593
bool: True if client error
594
"""
595
596
def should_retry_error(
597
error: Exception,
598
retry_settings: RetrySettings = None
599
) -> bool:
600
"""
601
Determine if error should be retried based on settings.
602
603
Args:
604
error (Exception): Error to evaluate
605
retry_settings (RetrySettings, optional): Retry configuration
606
607
Returns:
608
bool: True if error should be retried
609
"""
610
611
def get_retry_backoff(
612
error: Exception,
613
attempt: int,
614
retry_settings: RetrySettings = None
615
) -> float:
616
"""
617
Calculate appropriate backoff time for error and attempt.
618
619
Args:
620
error (Exception): Error that occurred
621
attempt (int): Retry attempt number
622
retry_settings (RetrySettings, optional): Retry configuration
623
624
Returns:
625
float: Backoff time in seconds
626
"""
627
```
628
629
### Error Context
630
631
Context management for error handling and debugging.
632
633
```python { .api }
634
class ErrorContext:
635
def __init__(
636
self,
637
operation: str = None,
638
request_id: str = None,
639
session_id: str = None,
640
endpoint: str = None,
641
database: str = None
642
):
643
"""
644
Context information for error analysis.
645
646
Args:
647
operation (str, optional): Operation being performed
648
request_id (str, optional): Request identifier
649
session_id (str, optional): Session identifier
650
endpoint (str, optional): YDB endpoint
651
database (str, optional): Database path
652
"""
653
654
@property
655
def operation(self) -> Optional[str]:
656
"""Operation being performed."""
657
658
@property
659
def request_id(self) -> Optional[str]:
660
"""Request identifier."""
661
662
@property
663
def session_id(self) -> Optional[str]:
664
"""Session identifier."""
665
666
def to_dict(self) -> dict:
667
"""Convert context to dictionary."""
668
669
def __str__(self) -> str:
670
"""String representation of context."""
671
672
class ErrorHandler:
673
def __init__(
674
self,
675
logger: logging.Logger = None,
676
context: ErrorContext = None
677
):
678
"""
679
Error handling utilities.
680
681
Args:
682
logger (logging.Logger, optional): Logger for error reporting
683
context (ErrorContext, optional): Error context information
684
"""
685
686
def handle_error(
687
self,
688
error: Exception,
689
operation: str = None
690
) -> bool:
691
"""
692
Handle error with appropriate logging and classification.
693
694
Args:
695
error (Exception): Error to handle
696
operation (str, optional): Operation context
697
698
Returns:
699
bool: True if error was handled
700
"""
701
702
def should_retry(
703
self,
704
error: Exception,
705
attempt: int,
706
max_retries: int = 10
707
) -> bool:
708
"""
709
Determine if operation should be retried.
710
711
Args:
712
error (Exception): Error that occurred
713
attempt (int): Current attempt number
714
max_retries (int): Maximum retry attempts
715
716
Returns:
717
bool: True if should retry
718
"""
719
720
def log_error(
721
self,
722
error: Exception,
723
level: int = logging.ERROR,
724
extra_context: dict = None
725
):
726
"""
727
Log error with context information.
728
729
Args:
730
error (Exception): Error to log
731
level (int): Log level
732
extra_context (dict, optional): Additional context
733
"""
734
```
735
736
## Usage Examples
737
738
### Basic Error Handling
739
740
```python
741
import ydb
742
import logging
743
744
# Configure logging
745
logging.basicConfig(level=logging.INFO)
746
logger = logging.getLogger(__name__)
747
748
def handle_ydb_operations():
749
driver = ydb.Driver(endpoint="grpc://localhost:2136", database="/local")
750
751
try:
752
driver.wait(fail_fast=True, timeout=5)
753
session_pool = ydb.SessionPool(driver)
754
755
def execute_query(session):
756
return session.execute_query("SELECT COUNT(*) FROM users")
757
758
# Execute with automatic retry
759
result = session_pool.retry_operation_sync(execute_query)
760
761
except ydb.ConnectionError as e:
762
logger.error(f"Connection failed: {e}")
763
# Handle connection issues - maybe use backup endpoint
764
765
except ydb.UnauthorizedError as e:
766
logger.error(f"Authentication failed: {e}")
767
# Handle auth issues - refresh credentials
768
769
except ydb.NotFoundError as e:
770
logger.error(f"Resource not found: {e}")
771
# Handle missing resources - create or use default
772
773
except ydb.RetryableError as e:
774
logger.warning(f"Retriable error occurred: {e}")
775
# These are handled automatically by retry_operation_sync
776
777
except ydb.Error as e:
778
logger.error(f"YDB error: {e}")
779
logger.error(f"Status code: {e.status}")
780
for issue in e.issues:
781
logger.error(f"Issue: {issue.message}")
782
783
except Exception as e:
784
logger.error(f"Unexpected error: {e}")
785
786
finally:
787
if 'session_pool' in locals():
788
session_pool.stop()
789
if 'driver' in locals():
790
driver.stop()
791
```
792
793
### Custom Retry Configuration
794
795
```python
796
# Configure custom retry behavior
797
def configure_custom_retries():
798
# Fast backoff for quick operations
799
fast_backoff = ydb.BackoffSettings(
800
slot_duration=0.001, # 1ms base
801
ceiling=8, # Up to 256ms
802
max_backoff=0.5, # Max 500ms
803
jitter_limit=0.1 # 10% jitter
804
)
805
806
# Slow backoff for heavy operations
807
slow_backoff = ydb.BackoffSettings(
808
slot_duration=2.0, # 2s base
809
ceiling=4, # Up to 32s
810
max_backoff=60.0, # Max 1 minute
811
jitter_limit=0.2 # 20% jitter
812
)
813
814
# Custom retry settings
815
retry_settings = ydb.RetrySettings(
816
max_retries=5,
817
fast_backoff_settings=fast_backoff,
818
slow_backoff_settings=slow_backoff,
819
retry_not_found=False, # Don't retry NOT_FOUND
820
retry_internal_error=True, # Retry internal errors
821
on_ydb_error_callback=lambda error: logger.warning(f"Retrying after: {error}")
822
)
823
824
return retry_settings
825
826
# Use custom retry settings
827
custom_retry_settings = configure_custom_retries()
828
829
def robust_operation(session):
830
# This operation will use custom retry behavior
831
return session.execute_query(
832
"SELECT * FROM large_table WHERE complex_condition = true"
833
)
834
835
result = session_pool.retry_operation_sync(
836
robust_operation,
837
retry_settings=custom_retry_settings
838
)
839
```
840
841
### Error Classification and Handling
842
843
```python
844
def classify_and_handle_error(error: Exception) -> bool:
845
"""
846
Classify error and determine handling strategy.
847
848
Returns:
849
bool: True if operation should continue, False if should abort
850
"""
851
852
if isinstance(error, ydb.BadRequestError):
853
logger.error(f"Bad request - fix query: {error}")
854
return False # Don't continue with bad requests
855
856
elif isinstance(error, ydb.UnauthorizedError):
857
logger.error(f"Auth failed - refresh credentials: {error}")
858
# Could refresh credentials here
859
return False
860
861
elif isinstance(error, ydb.NotFoundError):
862
logger.warning(f"Resource not found: {error}")
863
# Might create missing resource
864
return True
865
866
elif isinstance(error, ydb.OverloadedError):
867
logger.warning(f"Service overloaded: {error}")
868
# Implement backoff strategy
869
import time
870
time.sleep(5.0) # Wait before retrying
871
return True
872
873
elif isinstance(error, ydb.SessionExpiredError):
874
logger.info(f"Session expired - will get new session: {error}")
875
return True # Session pool will handle
876
877
elif isinstance(error, ydb.ConnectionError):
878
logger.warning(f"Connection issue: {error}")
879
# Could try alternative endpoint
880
return True
881
882
elif isinstance(error, ydb.RetryableError):
883
logger.info(f"Retriable error: {error}")
884
return True
885
886
else:
887
logger.error(f"Non-retriable error: {error}")
888
return False
889
890
# Example usage with manual retry logic
891
def manual_retry_operation(operation_func, max_attempts=3):
892
for attempt in range(max_attempts):
893
try:
894
return operation_func()
895
896
except Exception as e:
897
should_continue = classify_and_handle_error(e)
898
899
if not should_continue or attempt == max_attempts - 1:
900
raise # Re-raise if shouldn't continue or final attempt
901
902
# Calculate backoff
903
backoff_time = min(2 ** attempt, 10) # Exponential backoff, max 10s
904
logger.info(f"Retrying in {backoff_time}s (attempt {attempt + 1}/{max_attempts})")
905
time.sleep(backoff_time)
906
```
907
908
### Async Error Handling
909
910
```python
911
import asyncio
912
import ydb.aio as ydb_aio
913
914
async def async_error_handling():
915
"""Demonstrate async error handling patterns."""
916
917
async with ydb_aio.Driver(
918
endpoint="grpc://localhost:2136",
919
database="/local"
920
) as driver:
921
922
try:
923
await driver.wait(fail_fast=True, timeout=5)
924
925
async with ydb_aio.SessionPool(driver) as pool:
926
927
async def async_operation(session):
928
return await session.execute_query(
929
"SELECT * FROM users WHERE active = true"
930
)
931
932
# Use async retry
933
result = await pool.retry_operation(async_operation)
934
935
except ydb.ConnectionError as e:
936
logger.error(f"Async connection failed: {e}")
937
# Handle async connection issues
938
939
except ydb.TimeoutError as e:
940
logger.error(f"Async operation timed out: {e}")
941
# Handle timeout with alternative strategy
942
943
except asyncio.CancelledError:
944
logger.info("Operation cancelled")
945
raise # Re-raise cancellation
946
947
except Exception as e:
948
logger.error(f"Unexpected async error: {e}")
949
950
# Run async error handling
951
asyncio.run(async_error_handling())
952
```
953
954
### Error Recovery Strategies
955
956
```python
957
class ErrorRecoveryManager:
958
"""Manages error recovery strategies for YDB operations."""
959
960
def __init__(self, driver: ydb.Driver):
961
self.driver = driver
962
self.session_pool = ydb.SessionPool(driver)
963
self.fallback_data = {}
964
self.circuit_breaker_failures = 0
965
self.circuit_breaker_threshold = 5
966
self.circuit_breaker_reset_time = 60
967
self.last_failure_time = 0
968
969
def execute_with_recovery(self, operation_func, fallback_func=None):
970
"""Execute operation with comprehensive recovery strategy."""
971
972
# Check circuit breaker
973
if self._is_circuit_breaker_open():
974
logger.warning("Circuit breaker open, using fallback")
975
return self._execute_fallback(fallback_func)
976
977
try:
978
# Attempt primary operation
979
result = self.session_pool.retry_operation_sync(
980
operation_func,
981
retry_settings=ydb.RetrySettings(max_retries=3)
982
)
983
984
# Success - reset circuit breaker
985
self.circuit_breaker_failures = 0
986
return result
987
988
except ydb.OverloadedError:
989
# Specific handling for overload
990
logger.warning("Service overloaded, implementing backoff")
991
self._handle_overload()
992
return self._execute_fallback(fallback_func)
993
994
except ydb.SessionPoolEmptyError:
995
# Handle session pool exhaustion
996
logger.warning("Session pool exhausted, creating new pool")
997
self._recreate_session_pool()
998
# Retry once with new pool
999
try:
1000
return self.session_pool.retry_operation_sync(operation_func)
1001
except Exception:
1002
return self._execute_fallback(fallback_func)
1003
1004
except ydb.ConnectionError:
1005
# Handle connection issues
1006
logger.error("Connection failed, trying endpoint discovery")
1007
self._handle_connection_failure()
1008
return self._execute_fallback(fallback_func)
1009
1010
except ydb.RetryableError as e:
1011
# Track retriable failures for circuit breaker
1012
self.circuit_breaker_failures += 1
1013
self.last_failure_time = time.time()
1014
logger.error(f"Retriable error after retries: {e}")
1015
return self._execute_fallback(fallback_func)
1016
1017
except Exception as e:
1018
logger.error(f"Unhandled error: {e}")
1019
return self._execute_fallback(fallback_func)
1020
1021
def _is_circuit_breaker_open(self) -> bool:
1022
"""Check if circuit breaker should be open."""
1023
if self.circuit_breaker_failures < self.circuit_breaker_threshold:
1024
return False
1025
1026
# Check if enough time has passed to reset
1027
if time.time() - self.last_failure_time > self.circuit_breaker_reset_time:
1028
self.circuit_breaker_failures = 0
1029
return False
1030
1031
return True
1032
1033
def _execute_fallback(self, fallback_func):
1034
"""Execute fallback strategy."""
1035
if fallback_func:
1036
try:
1037
return fallback_func()
1038
except Exception as e:
1039
logger.error(f"Fallback also failed: {e}")
1040
1041
# Return cached/default data
1042
logger.info("Using cached fallback data")
1043
return self.fallback_data.get("default", [])
1044
1045
def _handle_overload(self):
1046
"""Handle service overload with exponential backoff."""
1047
backoff_time = min(2 ** self.circuit_breaker_failures, 30)
1048
logger.info(f"Backing off for {backoff_time}s due to overload")
1049
time.sleep(backoff_time)
1050
1051
def _recreate_session_pool(self):
1052
"""Recreate session pool if exhausted."""
1053
try:
1054
self.session_pool.stop()
1055
self.session_pool = ydb.SessionPool(self.driver, size=20)
1056
except Exception as e:
1057
logger.error(f"Failed to recreate session pool: {e}")
1058
1059
def _handle_connection_failure(self):
1060
"""Handle connection failures with endpoint rotation."""
1061
# Could implement endpoint discovery refresh here
1062
logger.info("Connection failure - would refresh endpoints")
1063
1064
# Usage example
1065
recovery_manager = ErrorRecoveryManager(driver)
1066
1067
def get_user_data(session):
1068
return session.execute_query("SELECT * FROM users LIMIT 100")
1069
1070
def fallback_user_data():
1071
# Return cached or default user data
1072
return [{"id": 1, "name": "Default User"}]
1073
1074
# Execute with comprehensive error recovery
1075
user_data = recovery_manager.execute_with_recovery(
1076
get_user_data,
1077
fallback_user_data
1078
)
1079
```
1080
1081
### Error Monitoring and Metrics
1082
1083
```python
1084
from collections import defaultdict
1085
import time
1086
1087
class ErrorMetricsCollector:
1088
"""Collect and report error metrics for monitoring."""
1089
1090
def __init__(self):
1091
self.error_counts = defaultdict(int)
1092
self.error_rates = defaultdict(list)
1093
self.retry_counts = defaultdict(int)
1094
self.start_time = time.time()
1095
1096
def record_error(self, error: Exception, operation: str = "unknown"):
1097
"""Record error occurrence for metrics."""
1098
error_type = type(error).__name__
1099
current_time = time.time()
1100
1101
# Count by error type
1102
self.error_counts[error_type] += 1
1103
1104
# Track error rates (errors per minute)
1105
self.error_rates[error_type].append(current_time)
1106
1107
# Clean old entries (keep last hour)
1108
cutoff_time = current_time - 3600
1109
self.error_rates[error_type] = [
1110
t for t in self.error_rates[error_type] if t > cutoff_time
1111
]
1112
1113
# Log structured error info
1114
logger.info(
1115
"error_occurred",
1116
extra={
1117
"error_type": error_type,
1118
"operation": operation,
1119
"error_message": str(error),
1120
"status_code": getattr(error, 'status', None)
1121
}
1122
)
1123
1124
def record_retry(self, error_type: str, attempt: int):
1125
"""Record retry attempt."""
1126
self.retry_counts[f"{error_type}_retry_{attempt}"] += 1
1127
1128
def get_error_summary(self) -> dict:
1129
"""Get current error summary for monitoring."""
1130
current_time = time.time()
1131
uptime = current_time - self.start_time
1132
1133
summary = {
1134
"uptime_seconds": uptime,
1135
"total_errors": sum(self.error_counts.values()),
1136
"error_counts": dict(self.error_counts),
1137
"retry_counts": dict(self.retry_counts),
1138
"error_rates_per_minute": {}
1139
}
1140
1141
# Calculate error rates per minute
1142
for error_type, timestamps in self.error_rates.items():
1143
recent_errors = len([t for t in timestamps if current_time - t < 60])
1144
summary["error_rates_per_minute"][error_type] = recent_errors
1145
1146
return summary
1147
1148
def should_alert(self) -> bool:
1149
"""Check if error rates warrant alerting."""
1150
current_time = time.time()
1151
1152
# Alert if more than 10 errors per minute for any type
1153
for error_type, timestamps in self.error_rates.items():
1154
recent_errors = len([t for t in timestamps if current_time - t < 60])
1155
if recent_errors > 10:
1156
logger.warning(f"High error rate: {recent_errors}/min for {error_type}")
1157
return True
1158
1159
return False
1160
1161
# Global metrics collector
1162
metrics = ErrorMetricsCollector()
1163
1164
# Enhanced retry operation with metrics
1165
def retry_with_metrics(operation_func, operation_name="unknown"):
1166
"""Execute operation with error metrics collection."""
1167
1168
for attempt in range(3):
1169
try:
1170
return operation_func()
1171
1172
except Exception as e:
1173
# Record error metrics
1174
metrics.record_error(e, operation_name)
1175
metrics.record_retry(type(e).__name__, attempt + 1)
1176
1177
if attempt == 2: # Final attempt
1178
raise
1179
1180
# Check if we should alert on error rates
1181
if metrics.should_alert():
1182
logger.critical("High error rates detected - consider investigation")
1183
1184
# Usage with metrics
1185
def monitored_database_operation():
1186
def db_query(session):
1187
return session.execute_query("SELECT COUNT(*) FROM orders")
1188
1189
return retry_with_metrics(
1190
lambda: session_pool.retry_operation_sync(db_query),
1191
"order_count_query"
1192
)
1193
1194
# Periodic metrics reporting
1195
def report_metrics():
1196
summary = metrics.get_error_summary()
1197
logger.info("error_summary", extra=summary)
1198
1199
# Could send to monitoring system here
1200
# send_to_monitoring(summary)
1201
1202
# Schedule periodic reporting
1203
import threading
1204
def periodic_reporting():
1205
while True:
1206
time.sleep(300) # Report every 5 minutes
1207
report_metrics()
1208
1209
reporting_thread = threading.Thread(target=periodic_reporting, daemon=True)
1210
reporting_thread.start()
1211
```
1212
1213
## Type Definitions
1214
1215
```python { .api }
1216
# Type aliases for error handling
1217
ErrorHandler = Callable[[Exception], bool]
1218
ErrorCallback = Callable[[Exception], None]
1219
RetryDecision = bool
1220
BackoffTime = float
1221
1222
# Error classification
1223
ErrorClassifier = Callable[[Exception], ErrorCategory]
1224
RetryStrategy = Callable[[Exception, int], RetryDecision]
1225
BackoffCalculator = Callable[[Exception, int], BackoffTime]
1226
1227
# Monitoring types
1228
ErrorMetric = Dict[str, Union[int, float, str]]
1229
MetricsReporter = Callable[[ErrorMetric], None]
1230
AlertTrigger = Callable[[ErrorMetric], bool]
1231
```