or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdauthentication.mddata-types.mddbapi-interface.mddriver-connection.mderror-handling.mdindex.mdquery-service.mdschema-operations.mdsqlalchemy-integration.mdtable-operations.mdtopic-operations.md

error-handling.mddocs/

0

# Error Handling and Retries

1

2

Comprehensive error handling with detailed error hierarchies, retry strategies, backoff configurations, and operation result processing.

3

4

## Capabilities

5

6

### Error Hierarchy

7

8

YDB Python SDK provides a comprehensive error hierarchy for handling different types of failures.

9

10

```python { .api }

11

class Error(Exception):

12

"""Base class for all YDB errors."""

13

14

def __init__(

15

self,

16

message: str,

17

issues: Optional[Iterable[IssueMessage]] = None

18

):

19

"""

20

Base YDB error.

21

22

Args:

23

message (str): Error message

24

issues (Optional[Iterable[IssueMessage]]): Detailed error issues

25

"""

26

super().__init__(message)

27

self.message = message

28

self.issues = issues or []

29

30

@property

31

def status(self) -> Optional[int]:

32

"""Error status code."""

33

34

@property

35

def issues(self) -> List[IssueMessage]:

36

"""Detailed error issues."""

37

38

class RetryableError(Error):

39

"""Base class for errors that can be retried."""

40

41

class BadRequestError(Error):

42

"""Request validation or syntax errors."""

43

status = StatusCode.BAD_REQUEST

44

45

class UnauthorizedError(Error):

46

"""Authentication errors."""

47

status = StatusCode.UNAUTHORIZED

48

49

class ForbiddenError(Error):

50

"""Authorization/permission errors."""

51

status = StatusCode.FORBIDDEN

52

53

class NotFoundError(Error):

54

"""Resource not found errors."""

55

status = StatusCode.NOT_FOUND

56

57

class AlreadyExistsError(Error):

58

"""Resource already exists errors."""

59

status = StatusCode.ALREADY_EXISTS

60

61

class PreconditionFailedError(RetryableError):

62

"""Precondition check failures."""

63

status = StatusCode.PRECONDITION_FAILED

64

65

class AbortedError(RetryableError):

66

"""Aborted operations due to conflicts."""

67

status = StatusCode.ABORTED

68

69

class UnavailableError(RetryableError):

70

"""Service temporarily unavailable."""

71

status = StatusCode.UNAVAILABLE

72

73

class OverloadedError(RetryableError):

74

"""Service overloaded, backoff required."""

75

status = StatusCode.OVERLOADED

76

77

class TimeoutError(RetryableError):

78

"""Operation timeout errors."""

79

status = StatusCode.TIMEOUT

80

81

class CancelledError(Error):

82

"""Cancelled operations."""

83

status = StatusCode.CANCELLED

84

85

class UndeterminedError(RetryableError):

86

"""Operations with undetermined outcome."""

87

status = StatusCode.UNDETERMINED

88

89

class InternalError(RetryableError):

90

"""Internal service errors."""

91

status = StatusCode.INTERNAL_ERROR

92

93

class UnsupportedError(Error):

94

"""Unsupported operations."""

95

status = StatusCode.UNSUPPORTED

96

97

class SchemeError(Error):

98

"""Schema-related errors."""

99

status = StatusCode.SCHEME_ERROR

100

101

class GenericError(Error):

102

"""Generic/unclassified errors."""

103

status = StatusCode.GENERIC_ERROR

104

```

105

106

### Session-Specific Errors

107

108

Errors related to session lifecycle and management.

109

110

```python { .api }

111

class BadSessionError(RetryableError):

112

"""Invalid or corrupted session state."""

113

status = StatusCode.BAD_SESSION

114

115

class SessionExpiredError(RetryableError):

116

"""Session has expired and needs renewal."""

117

status = StatusCode.SESSION_EXPIRED

118

119

class SessionBusyError(RetryableError):

120

"""Session is busy with another operation."""

121

status = StatusCode.SESSION_BUSY

122

123

class SessionPoolEmptyError(RetryableError):

124

"""No available sessions in the pool."""

125

status = StatusCode.SESSION_POOL_EMPTY

126

127

class SessionPoolClosedError(Error):

128

"""Session pool has been closed."""

129

status = StatusCode.SESSION_POOL_CLOSED

130

131

class QueryCacheEmptyError(RetryableError):

132

"""Query not found in prepared query cache."""

133

```

134

135

### Connection Errors

136

137

Network and transport-level error conditions.

138

139

```python { .api }

140

class ConnectionError(RetryableError):

141

"""Base class for connection-related errors."""

142

143

class ConnectionLostError(ConnectionError):

144

"""Connection lost during operation."""

145

status = StatusCode.CONNECTION_LOST

146

147

class ConnectionFailureError(ConnectionError):

148

"""Failed to establish connection."""

149

status = StatusCode.CONNECTION_FAILURE

150

151

class DeadlineExceededError(ConnectionError):

152

"""Operation exceeded deadline."""

153

status = StatusCode.DEADLINE_EXCEEDED

154

155

class ClientInternalError(Error):

156

"""Client-side internal errors."""

157

status = StatusCode.CLIENT_INTERNAL_ERROR

158

159

class ClientResourceExhaustedError(Error):

160

"""Client resources exhausted."""

161

162

class ClientDiscoveryError(ConnectionError):

163

"""Endpoint discovery failures."""

164

165

class UnauthenticatedError(Error):

166

"""Authentication credential errors."""

167

status = StatusCode.UNAUTHENTICATED

168

169

class UnimplementedError(Error):

170

"""Unimplemented features or operations."""

171

status = StatusCode.UNIMPLEMENTED

172

```

173

174

### Status Codes

175

176

Enumeration of all possible YDB status codes.

177

178

```python { .api }

179

class StatusCode(enum.IntEnum):

180

"""YDB operation status codes."""

181

182

# Success

183

SUCCESS = 0

184

185

# Client errors (4xx equivalent)

186

BAD_REQUEST = 400

187

UNAUTHORIZED = 401

188

FORBIDDEN = 403

189

NOT_FOUND = 404

190

ALREADY_EXISTS = 409

191

PRECONDITION_FAILED = 412

192

UNSUPPORTED = 501

193

194

# Server errors (5xx equivalent)

195

INTERNAL_ERROR = 500

196

UNAVAILABLE = 503

197

TIMEOUT = 504

198

OVERLOADED = 503

199

200

# YDB-specific

201

ABORTED = 10

202

CANCELLED = 1

203

UNDETERMINED = 2

204

SCHEME_ERROR = 20

205

GENERIC_ERROR = 21

206

BAD_SESSION = 30

207

SESSION_EXPIRED = 31

208

SESSION_BUSY = 32

209

210

# Transport errors

211

CONNECTION_LOST = 401010

212

CONNECTION_FAILURE = 401020

213

DEADLINE_EXCEEDED = 401030

214

CLIENT_INTERNAL_ERROR = 401040

215

UNIMPLEMENTED = 401050

216

217

# Client pool errors

218

UNAUTHENTICATED = 402030

219

SESSION_POOL_EMPTY = 402040

220

SESSION_POOL_CLOSED = 402050

221

222

def is_retryable_error(error: Exception) -> bool:

223

"""

224

Check if error is retryable.

225

226

Args:

227

error (Exception): Error to check

228

229

Returns:

230

bool: True if error can be retried

231

"""

232

233

def get_error_class(status_code: int) -> Type[Error]:

234

"""

235

Get error class for status code.

236

237

Args:

238

status_code (int): YDB status code

239

240

Returns:

241

Type[Error]: Appropriate error class

242

"""

243

```

244

245

### Issue Messages

246

247

Detailed error information with nested issue structures.

248

249

```python { .api }

250

class IssueMessage:

251

def __init__(

252

self,

253

message: str,

254

issue_code: int = None,

255

severity: int = None,

256

issues: List['IssueMessage'] = None

257

):

258

"""

259

Detailed error issue information.

260

261

Args:

262

message (str): Issue description

263

issue_code (int, optional): Issue-specific code

264

severity (int, optional): Issue severity level

265

issues (List[IssueMessage], optional): Nested sub-issues

266

"""

267

268

@property

269

def message(self) -> str:

270

"""Issue description message."""

271

272

@property

273

def issue_code(self) -> Optional[int]:

274

"""Issue-specific code."""

275

276

@property

277

def severity(self) -> Optional[int]:

278

"""Issue severity level."""

279

280

@property

281

def issues(self) -> List['IssueMessage']:

282

"""Nested sub-issues."""

283

284

def to_dict(self) -> dict:

285

"""

286

Convert issue to dictionary representation.

287

288

Returns:

289

dict: Issue as dictionary

290

"""

291

292

def __str__(self) -> str:

293

"""String representation of the issue."""

294

295

class IssueSeverity(enum.IntEnum):

296

"""Issue severity levels."""

297

INFO = 1

298

NOTICE = 2

299

WARNING = 3

300

ERROR = 4

301

FATAL = 5

302

```

303

304

### Retry Configuration

305

306

Configurable retry strategies with exponential backoff and jitter.

307

308

```python { .api }

309

class RetrySettings:

310

def __init__(

311

self,

312

max_retries: int = 10,

313

max_session_acquire_timeout: float = None,

314

fast_backoff_settings: BackoffSettings = None,

315

slow_backoff_settings: BackoffSettings = None,

316

retry_not_found: bool = False,

317

retry_internal_error: bool = True,

318

unknown_error_handler: Callable[[Exception], bool] = None,

319

on_ydb_error_callback: Callable[[YdbError], None] = None

320

):

321

"""

322

Retry configuration for YDB operations.

323

324

Args:

325

max_retries (int): Maximum number of retry attempts

326

max_session_acquire_timeout (float, optional): Session acquisition timeout

327

fast_backoff_settings (BackoffSettings, optional): Fast retry backoff

328

slow_backoff_settings (BackoffSettings, optional): Slow retry backoff

329

retry_not_found (bool): Whether to retry NotFound errors

330

retry_internal_error (bool): Whether to retry internal errors

331

unknown_error_handler (Callable, optional): Handler for unknown errors

332

on_ydb_error_callback (Callable, optional): Error callback function

333

"""

334

335

@property

336

def max_retries(self) -> int:

337

"""Maximum retry attempts."""

338

339

@property

340

def fast_backoff_settings(self) -> BackoffSettings:

341

"""Fast backoff configuration."""

342

343

@property

344

def slow_backoff_settings(self) -> BackoffSettings:

345

"""Slow backoff configuration."""

346

347

def with_max_retries(self, max_retries: int) -> 'RetrySettings':

348

"""

349

Create copy with different max retries.

350

351

Args:

352

max_retries (int): New max retry count

353

354

Returns:

355

RetrySettings: New retry settings instance

356

"""

357

358

def with_fast_backoff(self, backoff: BackoffSettings) -> 'RetrySettings':

359

"""

360

Create copy with different fast backoff.

361

362

Args:

363

backoff (BackoffSettings): New fast backoff settings

364

365

Returns:

366

RetrySettings: New retry settings instance

367

"""

368

369

class BackoffSettings:

370

def __init__(

371

self,

372

slot_duration: float = 1.0,

373

ceiling: int = 6,

374

max_backoff: float = 32.0,

375

jitter_limit: float = 1.0,

376

uncertain_ratio: float = 0.1

377

):

378

"""

379

Exponential backoff configuration.

380

381

Args:

382

slot_duration (float): Base slot duration in seconds

383

ceiling (int): Backoff ceiling (2^ceiling * slot_duration max)

384

max_backoff (float): Maximum backoff time in seconds

385

jitter_limit (float): Maximum jitter multiplier

386

uncertain_ratio (float): Ratio for uncertain error handling

387

"""

388

389

@property

390

def slot_duration(self) -> float:

391

"""Base slot duration."""

392

393

@property

394

def ceiling(self) -> int:

395

"""Backoff ceiling exponent."""

396

397

@property

398

def max_backoff(self) -> float:

399

"""Maximum backoff time."""

400

401

def calculate_backoff(self, attempt: int) -> float:

402

"""

403

Calculate backoff time for attempt.

404

405

Args:

406

attempt (int): Retry attempt number (0-based)

407

408

Returns:

409

float: Backoff time in seconds

410

"""

411

412

def with_jitter(self, backoff: float) -> float:

413

"""

414

Apply jitter to backoff time.

415

416

Args:

417

backoff (float): Base backoff time

418

419

Returns:

420

float: Jittered backoff time

421

"""

422

423

# Predefined backoff settings

424

DEFAULT_FAST_BACKOFF = BackoffSettings(slot_duration=0.005, ceiling=10, max_backoff=0.2)

425

DEFAULT_SLOW_BACKOFF = BackoffSettings(slot_duration=1.0, ceiling=6, max_backoff=32.0)

426

```

427

428

### Retry Operations

429

430

High-level retry functionality for database operations.

431

432

```python { .api }

433

def retry_operation_sync(

434

callee: Callable[..., Any],

435

retry_settings: RetrySettings = None,

436

session_pool: SessionPool = None,

437

*args,

438

**kwargs

439

) -> Any:

440

"""

441

Execute operation with retry logic.

442

443

Args:

444

callee (Callable): Function to execute

445

retry_settings (RetrySettings, optional): Retry configuration

446

session_pool (SessionPool, optional): Session pool for session-based operations

447

*args: Arguments for callee

448

**kwargs: Keyword arguments for callee

449

450

Returns:

451

Any: Result of successful callee execution

452

453

Raises:

454

Error: Final error if all retries exhausted

455

"""

456

457

async def retry_operation(

458

callee: Callable[..., Awaitable[Any]],

459

retry_settings: RetrySettings = None,

460

session_pool: SessionPool = None,

461

*args,

462

**kwargs

463

) -> Any:

464

"""

465

Execute async operation with retry logic.

466

467

Args:

468

callee (Callable): Async function to execute

469

retry_settings (RetrySettings, optional): Retry configuration

470

session_pool (SessionPool, optional): Session pool for session-based operations

471

*args: Arguments for callee

472

**kwargs: Keyword arguments for callee

473

474

Returns:

475

Any: Result of successful callee execution

476

477

Raises:

478

Error: Final error if all retries exhausted

479

"""

480

481

class YdbRetryOperationSleepOpt:

482

def __init__(

483

self,

484

timeout: float = None,

485

backoff_settings: BackoffSettings = None

486

):

487

"""

488

Sleep options for retry operations.

489

490

Args:

491

timeout (float, optional): Maximum sleep time

492

backoff_settings (BackoffSettings, optional): Backoff configuration

493

"""

494

495

class YdbRetryOperationFinalResult:

496

def __init__(

497

self,

498

result: Any = None,

499

error: Exception = None,

500

attempts: int = 0

501

):

502

"""

503

Final result of retry operation.

504

505

Args:

506

result (Any, optional): Operation result if successful

507

error (Exception, optional): Final error if failed

508

attempts (int): Number of attempts made

509

"""

510

511

@property

512

def is_success(self) -> bool:

513

"""True if operation succeeded."""

514

515

@property

516

def is_failure(self) -> bool:

517

"""True if operation failed."""

518

519

def retry_operation_impl(

520

callee: Callable,

521

retry_settings: RetrySettings = None,

522

*args,

523

**kwargs

524

) -> YdbRetryOperationFinalResult:

525

"""

526

Low-level retry implementation.

527

528

Args:

529

callee (Callable): Function to execute

530

retry_settings (RetrySettings, optional): Retry configuration

531

*args: Arguments for callee

532

**kwargs: Keyword arguments for callee

533

534

Returns:

535

YdbRetryOperationFinalResult: Operation result with metadata

536

"""

537

```

538

539

### Error Classification

540

541

Utilities for categorizing and handling different error types.

542

543

```python { .api }

544

def classify_error(error: Exception) -> ErrorCategory:

545

"""

546

Classify error into category for handling strategy.

547

548

Args:

549

error (Exception): Error to classify

550

551

Returns:

552

ErrorCategory: Error category

553

"""

554

555

class ErrorCategory(enum.Enum):

556

"""Error classification categories."""

557

RETRIABLE_FAST = "retriable_fast" # Quick retry with fast backoff

558

RETRIABLE_SLOW = "retriable_slow" # Retry with slow backoff

559

RETRIABLE_UNCERTAIN = "retriable_uncertain" # Uncertain outcome, careful retry

560

NON_RETRIABLE = "non_retriable" # Don't retry these errors

561

FATAL = "fatal" # Fatal errors, stop immediately

562

563

def is_transport_error(error: Exception) -> bool:

564

"""

565

Check if error is transport/network related.

566

567

Args:

568

error (Exception): Error to check

569

570

Returns:

571

bool: True if transport error

572

"""

573

574

def is_server_error(error: Exception) -> bool:

575

"""

576

Check if error is server-side.

577

578

Args:

579

error (Exception): Error to check

580

581

Returns:

582

bool: True if server error

583

"""

584

585

def is_client_error(error: Exception) -> bool:

586

"""

587

Check if error is client-side.

588

589

Args:

590

error (Exception): Error to check

591

592

Returns:

593

bool: True if client error

594

"""

595

596

def should_retry_error(

597

error: Exception,

598

retry_settings: RetrySettings = None

599

) -> bool:

600

"""

601

Determine if error should be retried based on settings.

602

603

Args:

604

error (Exception): Error to evaluate

605

retry_settings (RetrySettings, optional): Retry configuration

606

607

Returns:

608

bool: True if error should be retried

609

"""

610

611

def get_retry_backoff(

612

error: Exception,

613

attempt: int,

614

retry_settings: RetrySettings = None

615

) -> float:

616

"""

617

Calculate appropriate backoff time for error and attempt.

618

619

Args:

620

error (Exception): Error that occurred

621

attempt (int): Retry attempt number

622

retry_settings (RetrySettings, optional): Retry configuration

623

624

Returns:

625

float: Backoff time in seconds

626

"""

627

```

628

629

### Error Context

630

631

Context management for error handling and debugging.

632

633

```python { .api }

634

class ErrorContext:

635

def __init__(

636

self,

637

operation: str = None,

638

request_id: str = None,

639

session_id: str = None,

640

endpoint: str = None,

641

database: str = None

642

):

643

"""

644

Context information for error analysis.

645

646

Args:

647

operation (str, optional): Operation being performed

648

request_id (str, optional): Request identifier

649

session_id (str, optional): Session identifier

650

endpoint (str, optional): YDB endpoint

651

database (str, optional): Database path

652

"""

653

654

@property

655

def operation(self) -> Optional[str]:

656

"""Operation being performed."""

657

658

@property

659

def request_id(self) -> Optional[str]:

660

"""Request identifier."""

661

662

@property

663

def session_id(self) -> Optional[str]:

664

"""Session identifier."""

665

666

def to_dict(self) -> dict:

667

"""Convert context to dictionary."""

668

669

def __str__(self) -> str:

670

"""String representation of context."""

671

672

class ErrorHandler:

673

def __init__(

674

self,

675

logger: logging.Logger = None,

676

context: ErrorContext = None

677

):

678

"""

679

Error handling utilities.

680

681

Args:

682

logger (logging.Logger, optional): Logger for error reporting

683

context (ErrorContext, optional): Error context information

684

"""

685

686

def handle_error(

687

self,

688

error: Exception,

689

operation: str = None

690

) -> bool:

691

"""

692

Handle error with appropriate logging and classification.

693

694

Args:

695

error (Exception): Error to handle

696

operation (str, optional): Operation context

697

698

Returns:

699

bool: True if error was handled

700

"""

701

702

def should_retry(

703

self,

704

error: Exception,

705

attempt: int,

706

max_retries: int = 10

707

) -> bool:

708

"""

709

Determine if operation should be retried.

710

711

Args:

712

error (Exception): Error that occurred

713

attempt (int): Current attempt number

714

max_retries (int): Maximum retry attempts

715

716

Returns:

717

bool: True if should retry

718

"""

719

720

def log_error(

721

self,

722

error: Exception,

723

level: int = logging.ERROR,

724

extra_context: dict = None

725

):

726

"""

727

Log error with context information.

728

729

Args:

730

error (Exception): Error to log

731

level (int): Log level

732

extra_context (dict, optional): Additional context

733

"""

734

```

735

736

## Usage Examples

737

738

### Basic Error Handling

739

740

```python

741

import ydb

742

import logging

743

744

# Configure logging

745

logging.basicConfig(level=logging.INFO)

746

logger = logging.getLogger(__name__)

747

748

def handle_ydb_operations():

749

driver = ydb.Driver(endpoint="grpc://localhost:2136", database="/local")

750

751

try:

752

driver.wait(fail_fast=True, timeout=5)

753

session_pool = ydb.SessionPool(driver)

754

755

def execute_query(session):

756

return session.execute_query("SELECT COUNT(*) FROM users")

757

758

# Execute with automatic retry

759

result = session_pool.retry_operation_sync(execute_query)

760

761

except ydb.ConnectionError as e:

762

logger.error(f"Connection failed: {e}")

763

# Handle connection issues - maybe use backup endpoint

764

765

except ydb.UnauthorizedError as e:

766

logger.error(f"Authentication failed: {e}")

767

# Handle auth issues - refresh credentials

768

769

except ydb.NotFoundError as e:

770

logger.error(f"Resource not found: {e}")

771

# Handle missing resources - create or use default

772

773

except ydb.RetryableError as e:

774

logger.warning(f"Retriable error occurred: {e}")

775

# These are handled automatically by retry_operation_sync

776

777

except ydb.Error as e:

778

logger.error(f"YDB error: {e}")

779

logger.error(f"Status code: {e.status}")

780

for issue in e.issues:

781

logger.error(f"Issue: {issue.message}")

782

783

except Exception as e:

784

logger.error(f"Unexpected error: {e}")

785

786

finally:

787

if 'session_pool' in locals():

788

session_pool.stop()

789

if 'driver' in locals():

790

driver.stop()

791

```

792

793

### Custom Retry Configuration

794

795

```python

796

# Configure custom retry behavior

797

def configure_custom_retries():

798

# Fast backoff for quick operations

799

fast_backoff = ydb.BackoffSettings(

800

slot_duration=0.001, # 1ms base

801

ceiling=8, # Up to 256ms

802

max_backoff=0.5, # Max 500ms

803

jitter_limit=0.1 # 10% jitter

804

)

805

806

# Slow backoff for heavy operations

807

slow_backoff = ydb.BackoffSettings(

808

slot_duration=2.0, # 2s base

809

ceiling=4, # Up to 32s

810

max_backoff=60.0, # Max 1 minute

811

jitter_limit=0.2 # 20% jitter

812

)

813

814

# Custom retry settings

815

retry_settings = ydb.RetrySettings(

816

max_retries=5,

817

fast_backoff_settings=fast_backoff,

818

slow_backoff_settings=slow_backoff,

819

retry_not_found=False, # Don't retry NOT_FOUND

820

retry_internal_error=True, # Retry internal errors

821

on_ydb_error_callback=lambda error: logger.warning(f"Retrying after: {error}")

822

)

823

824

return retry_settings

825

826

# Use custom retry settings

827

custom_retry_settings = configure_custom_retries()

828

829

def robust_operation(session):

830

# This operation will use custom retry behavior

831

return session.execute_query(

832

"SELECT * FROM large_table WHERE complex_condition = true"

833

)

834

835

result = session_pool.retry_operation_sync(

836

robust_operation,

837

retry_settings=custom_retry_settings

838

)

839

```

840

841

### Error Classification and Handling

842

843

```python

844

def classify_and_handle_error(error: Exception) -> bool:

845

"""

846

Classify error and determine handling strategy.

847

848

Returns:

849

bool: True if operation should continue, False if should abort

850

"""

851

852

if isinstance(error, ydb.BadRequestError):

853

logger.error(f"Bad request - fix query: {error}")

854

return False # Don't continue with bad requests

855

856

elif isinstance(error, ydb.UnauthorizedError):

857

logger.error(f"Auth failed - refresh credentials: {error}")

858

# Could refresh credentials here

859

return False

860

861

elif isinstance(error, ydb.NotFoundError):

862

logger.warning(f"Resource not found: {error}")

863

# Might create missing resource

864

return True

865

866

elif isinstance(error, ydb.OverloadedError):

867

logger.warning(f"Service overloaded: {error}")

868

# Implement backoff strategy

869

import time

870

time.sleep(5.0) # Wait before retrying

871

return True

872

873

elif isinstance(error, ydb.SessionExpiredError):

874

logger.info(f"Session expired - will get new session: {error}")

875

return True # Session pool will handle

876

877

elif isinstance(error, ydb.ConnectionError):

878

logger.warning(f"Connection issue: {error}")

879

# Could try alternative endpoint

880

return True

881

882

elif isinstance(error, ydb.RetryableError):

883

logger.info(f"Retriable error: {error}")

884

return True

885

886

else:

887

logger.error(f"Non-retriable error: {error}")

888

return False

889

890

# Example usage with manual retry logic

891

def manual_retry_operation(operation_func, max_attempts=3):

892

for attempt in range(max_attempts):

893

try:

894

return operation_func()

895

896

except Exception as e:

897

should_continue = classify_and_handle_error(e)

898

899

if not should_continue or attempt == max_attempts - 1:

900

raise # Re-raise if shouldn't continue or final attempt

901

902

# Calculate backoff

903

backoff_time = min(2 ** attempt, 10) # Exponential backoff, max 10s

904

logger.info(f"Retrying in {backoff_time}s (attempt {attempt + 1}/{max_attempts})")

905

time.sleep(backoff_time)

906

```

907

908

### Async Error Handling

909

910

```python

911

import asyncio

912

import ydb.aio as ydb_aio

913

914

async def async_error_handling():

915

"""Demonstrate async error handling patterns."""

916

917

async with ydb_aio.Driver(

918

endpoint="grpc://localhost:2136",

919

database="/local"

920

) as driver:

921

922

try:

923

await driver.wait(fail_fast=True, timeout=5)

924

925

async with ydb_aio.SessionPool(driver) as pool:

926

927

async def async_operation(session):

928

return await session.execute_query(

929

"SELECT * FROM users WHERE active = true"

930

)

931

932

# Use async retry

933

result = await pool.retry_operation(async_operation)

934

935

except ydb.ConnectionError as e:

936

logger.error(f"Async connection failed: {e}")

937

# Handle async connection issues

938

939

except ydb.TimeoutError as e:

940

logger.error(f"Async operation timed out: {e}")

941

# Handle timeout with alternative strategy

942

943

except asyncio.CancelledError:

944

logger.info("Operation cancelled")

945

raise # Re-raise cancellation

946

947

except Exception as e:

948

logger.error(f"Unexpected async error: {e}")

949

950

# Run async error handling

951

asyncio.run(async_error_handling())

952

```

953

954

### Error Recovery Strategies

955

956

```python

957

class ErrorRecoveryManager:

958

"""Manages error recovery strategies for YDB operations."""

959

960

def __init__(self, driver: ydb.Driver):

961

self.driver = driver

962

self.session_pool = ydb.SessionPool(driver)

963

self.fallback_data = {}

964

self.circuit_breaker_failures = 0

965

self.circuit_breaker_threshold = 5

966

self.circuit_breaker_reset_time = 60

967

self.last_failure_time = 0

968

969

def execute_with_recovery(self, operation_func, fallback_func=None):

970

"""Execute operation with comprehensive recovery strategy."""

971

972

# Check circuit breaker

973

if self._is_circuit_breaker_open():

974

logger.warning("Circuit breaker open, using fallback")

975

return self._execute_fallback(fallback_func)

976

977

try:

978

# Attempt primary operation

979

result = self.session_pool.retry_operation_sync(

980

operation_func,

981

retry_settings=ydb.RetrySettings(max_retries=3)

982

)

983

984

# Success - reset circuit breaker

985

self.circuit_breaker_failures = 0

986

return result

987

988

except ydb.OverloadedError:

989

# Specific handling for overload

990

logger.warning("Service overloaded, implementing backoff")

991

self._handle_overload()

992

return self._execute_fallback(fallback_func)

993

994

except ydb.SessionPoolEmptyError:

995

# Handle session pool exhaustion

996

logger.warning("Session pool exhausted, creating new pool")

997

self._recreate_session_pool()

998

# Retry once with new pool

999

try:

1000

return self.session_pool.retry_operation_sync(operation_func)

1001

except Exception:

1002

return self._execute_fallback(fallback_func)

1003

1004

except ydb.ConnectionError:

1005

# Handle connection issues

1006

logger.error("Connection failed, trying endpoint discovery")

1007

self._handle_connection_failure()

1008

return self._execute_fallback(fallback_func)

1009

1010

except ydb.RetryableError as e:

1011

# Track retriable failures for circuit breaker

1012

self.circuit_breaker_failures += 1

1013

self.last_failure_time = time.time()

1014

logger.error(f"Retriable error after retries: {e}")

1015

return self._execute_fallback(fallback_func)

1016

1017

except Exception as e:

1018

logger.error(f"Unhandled error: {e}")

1019

return self._execute_fallback(fallback_func)

1020

1021

def _is_circuit_breaker_open(self) -> bool:

1022

"""Check if circuit breaker should be open."""

1023

if self.circuit_breaker_failures < self.circuit_breaker_threshold:

1024

return False

1025

1026

# Check if enough time has passed to reset

1027

if time.time() - self.last_failure_time > self.circuit_breaker_reset_time:

1028

self.circuit_breaker_failures = 0

1029

return False

1030

1031

return True

1032

1033

def _execute_fallback(self, fallback_func):

1034

"""Execute fallback strategy."""

1035

if fallback_func:

1036

try:

1037

return fallback_func()

1038

except Exception as e:

1039

logger.error(f"Fallback also failed: {e}")

1040

1041

# Return cached/default data

1042

logger.info("Using cached fallback data")

1043

return self.fallback_data.get("default", [])

1044

1045

def _handle_overload(self):

1046

"""Handle service overload with exponential backoff."""

1047

backoff_time = min(2 ** self.circuit_breaker_failures, 30)

1048

logger.info(f"Backing off for {backoff_time}s due to overload")

1049

time.sleep(backoff_time)

1050

1051

def _recreate_session_pool(self):

1052

"""Recreate session pool if exhausted."""

1053

try:

1054

self.session_pool.stop()

1055

self.session_pool = ydb.SessionPool(self.driver, size=20)

1056

except Exception as e:

1057

logger.error(f"Failed to recreate session pool: {e}")

1058

1059

def _handle_connection_failure(self):

1060

"""Handle connection failures with endpoint rotation."""

1061

# Could implement endpoint discovery refresh here

1062

logger.info("Connection failure - would refresh endpoints")

1063

1064

# Usage example

1065

recovery_manager = ErrorRecoveryManager(driver)

1066

1067

def get_user_data(session):

1068

return session.execute_query("SELECT * FROM users LIMIT 100")

1069

1070

def fallback_user_data():

1071

# Return cached or default user data

1072

return [{"id": 1, "name": "Default User"}]

1073

1074

# Execute with comprehensive error recovery

1075

user_data = recovery_manager.execute_with_recovery(

1076

get_user_data,

1077

fallback_user_data

1078

)

1079

```

1080

1081

### Error Monitoring and Metrics

1082

1083

```python

1084

from collections import defaultdict

1085

import time

1086

1087

class ErrorMetricsCollector:

1088

"""Collect and report error metrics for monitoring."""

1089

1090

def __init__(self):

1091

self.error_counts = defaultdict(int)

1092

self.error_rates = defaultdict(list)

1093

self.retry_counts = defaultdict(int)

1094

self.start_time = time.time()

1095

1096

def record_error(self, error: Exception, operation: str = "unknown"):

1097

"""Record error occurrence for metrics."""

1098

error_type = type(error).__name__

1099

current_time = time.time()

1100

1101

# Count by error type

1102

self.error_counts[error_type] += 1

1103

1104

# Track error rates (errors per minute)

1105

self.error_rates[error_type].append(current_time)

1106

1107

# Clean old entries (keep last hour)

1108

cutoff_time = current_time - 3600

1109

self.error_rates[error_type] = [

1110

t for t in self.error_rates[error_type] if t > cutoff_time

1111

]

1112

1113

# Log structured error info

1114

logger.info(

1115

"error_occurred",

1116

extra={

1117

"error_type": error_type,

1118

"operation": operation,

1119

"error_message": str(error),

1120

"status_code": getattr(error, 'status', None)

1121

}

1122

)

1123

1124

def record_retry(self, error_type: str, attempt: int):

1125

"""Record retry attempt."""

1126

self.retry_counts[f"{error_type}_retry_{attempt}"] += 1

1127

1128

def get_error_summary(self) -> dict:

1129

"""Get current error summary for monitoring."""

1130

current_time = time.time()

1131

uptime = current_time - self.start_time

1132

1133

summary = {

1134

"uptime_seconds": uptime,

1135

"total_errors": sum(self.error_counts.values()),

1136

"error_counts": dict(self.error_counts),

1137

"retry_counts": dict(self.retry_counts),

1138

"error_rates_per_minute": {}

1139

}

1140

1141

# Calculate error rates per minute

1142

for error_type, timestamps in self.error_rates.items():

1143

recent_errors = len([t for t in timestamps if current_time - t < 60])

1144

summary["error_rates_per_minute"][error_type] = recent_errors

1145

1146

return summary

1147

1148

def should_alert(self) -> bool:

1149

"""Check if error rates warrant alerting."""

1150

current_time = time.time()

1151

1152

# Alert if more than 10 errors per minute for any type

1153

for error_type, timestamps in self.error_rates.items():

1154

recent_errors = len([t for t in timestamps if current_time - t < 60])

1155

if recent_errors > 10:

1156

logger.warning(f"High error rate: {recent_errors}/min for {error_type}")

1157

return True

1158

1159

return False

1160

1161

# Global metrics collector

1162

metrics = ErrorMetricsCollector()

1163

1164

# Enhanced retry operation with metrics

1165

def retry_with_metrics(operation_func, operation_name="unknown"):

1166

"""Execute operation with error metrics collection."""

1167

1168

for attempt in range(3):

1169

try:

1170

return operation_func()

1171

1172

except Exception as e:

1173

# Record error metrics

1174

metrics.record_error(e, operation_name)

1175

metrics.record_retry(type(e).__name__, attempt + 1)

1176

1177

if attempt == 2: # Final attempt

1178

raise

1179

1180

# Check if we should alert on error rates

1181

if metrics.should_alert():

1182

logger.critical("High error rates detected - consider investigation")

1183

1184

# Usage with metrics

1185

def monitored_database_operation():

1186

def db_query(session):

1187

return session.execute_query("SELECT COUNT(*) FROM orders")

1188

1189

return retry_with_metrics(

1190

lambda: session_pool.retry_operation_sync(db_query),

1191

"order_count_query"

1192

)

1193

1194

# Periodic metrics reporting

1195

def report_metrics():

1196

summary = metrics.get_error_summary()

1197

logger.info("error_summary", extra=summary)

1198

1199

# Could send to monitoring system here

1200

# send_to_monitoring(summary)

1201

1202

# Schedule periodic reporting

1203

import threading

1204

def periodic_reporting():

1205

while True:

1206

time.sleep(300) # Report every 5 minutes

1207

report_metrics()

1208

1209

reporting_thread = threading.Thread(target=periodic_reporting, daemon=True)

1210

reporting_thread.start()

1211

```

1212

1213

## Type Definitions

1214

1215

```python { .api }

1216

# Type aliases for error handling

1217

ErrorHandler = Callable[[Exception], bool]

1218

ErrorCallback = Callable[[Exception], None]

1219

RetryDecision = bool

1220

BackoffTime = float

1221

1222

# Error classification

1223

ErrorClassifier = Callable[[Exception], ErrorCategory]

1224

RetryStrategy = Callable[[Exception, int], RetryDecision]

1225

BackoffCalculator = Callable[[Exception, int], BackoffTime]

1226

1227

# Monitoring types

1228

ErrorMetric = Dict[str, Union[int, float, str]]

1229

MetricsReporter = Callable[[ErrorMetric], None]

1230

AlertTrigger = Callable[[ErrorMetric], bool]

1231

```