or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bandwidth-management.mdconfiguration.mdcrt-support.mdexception-handling.mdfile-utilities.mdfutures-coordination.mdindex.mdlegacy-transfer.mdprocess-pool-downloads.mdsubscribers-callbacks.mdtransfer-manager.md

exception-handling.mddocs/

0

# Exception Handling

1

2

Comprehensive exception classes for handling transfer failures, retry exhaustion, coordination errors, and other exceptional conditions in S3 transfer operations.

3

4

## Capabilities

5

6

### Transfer Operation Exceptions

7

8

Core exceptions for S3 transfer operation failures with specific error contexts and recovery information.

9

10

```python { .api }

11

class RetriesExceededError(Exception):

12

"""

13

Raised when maximum number of retries exceeded during transfer operations.

14

15

Args:

16

last_exception: The final exception that caused the retry failure

17

"""

18

def __init__(self, last_exception): ...

19

20

@property

21

def last_exception(self):

22

"""

23

The last exception that occurred before retries were exhausted.

24

25

Returns:

26

Exception: The final exception that caused failure

27

"""

28

29

class S3UploadFailedError(Exception):

30

"""

31

Raised when S3 upload operation fails.

32

33

Typically wraps underlying exceptions from S3 operations or network failures.

34

"""

35

36

class S3DownloadFailedError(Exception):

37

"""

38

Raised when S3 download operation fails.

39

40

Typically wraps underlying exceptions from S3 operations or network failures.

41

"""

42

```

43

44

### Coordination and State Exceptions

45

46

Exceptions related to transfer coordination, state management, and lifecycle issues.

47

48

```python { .api }

49

class TransferNotDoneError(Exception):

50

"""

51

Raised when attempting transfer operations before completion.

52

53

Occurs when trying to access results or perform operations on transfers

54

that haven't finished executing.

55

"""

56

57

class FatalError(CancelledError):

58

"""

59

Fatal error in TransferManager that causes immediate shutdown.

60

61

Inherits from CancelledError to indicate that the transfer was cancelled

62

due to a fatal condition.

63

"""

64

65

class InvalidSubscriberMethodError(Exception):

66

"""

67

Raised when subscriber method is invalid or improperly implemented.

68

69

Args:

70

subscriber: The subscriber object with invalid method

71

method_name (str): Name of the invalid method

72

reason (str): Description of why the method is invalid

73

"""

74

def __init__(self, subscriber, method_name: str, reason: str): ...

75

```

76

77

### Queue and Resource Exceptions

78

79

Exceptions related to queue operations and resource management.

80

81

```python { .api }

82

class QueueShutdownError(Exception):

83

"""

84

Raised when attempting to put items in a shutdown queue.

85

86

Occurs when trying to add items to a ShutdownQueue after it has been

87

triggered for shutdown.

88

"""

89

```

90

91

### Bandwidth and Rate Limiting Exceptions

92

93

Exceptions specific to bandwidth management and rate limiting operations.

94

95

```python { .api }

96

class RequestExceededException(Exception):

97

"""

98

Raised when bandwidth request exceeds available capacity.

99

100

Args:

101

requested_amt (int): Number of bytes that were requested

102

retry_time (float): Time when request can be retried

103

"""

104

def __init__(self, requested_amt: int, retry_time: float): ...

105

106

@property

107

def requested_amt(self) -> int:

108

"""Number of bytes that were requested."""

109

110

@property

111

def retry_time(self) -> float:

112

"""Time when request can be retried."""

113

```

114

115

## Usage Examples

116

117

### Basic Exception Handling

118

119

```python

120

from s3transfer.manager import TransferManager

121

from s3transfer.exceptions import (

122

S3UploadFailedError, S3DownloadFailedError,

123

TransferNotDoneError, RetriesExceededError

124

)

125

import boto3

126

127

client = boto3.client('s3')

128

transfer_manager = TransferManager(client)

129

130

try:

131

# Upload with comprehensive error handling

132

with open('/tmp/test_file.txt', 'rb') as f:

133

future = transfer_manager.upload(f, 'my-bucket', 'test_file.txt')

134

135

try:

136

# Wait for completion

137

result = future.result()

138

print("Upload completed successfully!")

139

140

except S3UploadFailedError as e:

141

print(f"Upload failed: {e}")

142

# Handle upload-specific failure

143

144

except TransferNotDoneError as e:

145

print(f"Transfer not complete: {e}")

146

# Handle premature access to results

147

148

except RetriesExceededError as e:

149

print(f"Retries exhausted: {e}")

150

print(f"Last exception: {e.last_exception}")

151

# Handle retry exhaustion

152

153

except Exception as e:

154

print(f"Unexpected error: {e}")

155

# Handle any other exceptions

156

157

finally:

158

transfer_manager.shutdown()

159

```

160

161

### Retry Logic with Exception Handling

162

163

```python

164

import time

165

import random

166

from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError

167

168

def upload_with_custom_retry(transfer_manager, fileobj, bucket, key, max_retries=3):

169

"""Upload with custom retry logic and exception handling."""

170

171

for attempt in range(max_retries + 1):

172

try:

173

future = transfer_manager.upload(fileobj, bucket, key)

174

result = future.result()

175

print(f"Upload succeeded on attempt {attempt + 1}")

176

return result

177

178

except S3UploadFailedError as e:

179

print(f"Upload attempt {attempt + 1} failed: {e}")

180

181

if attempt < max_retries:

182

# Exponential backoff with jitter

183

delay = (2 ** attempt) + random.uniform(0, 1)

184

print(f"Retrying in {delay:.2f} seconds...")

185

time.sleep(delay)

186

187

# Reset file position for retry

188

fileobj.seek(0)

189

else:

190

print("All retry attempts exhausted")

191

raise

192

193

except RetriesExceededError as e:

194

print(f"Internal retries exhausted on attempt {attempt + 1}")

195

print(f"Last internal exception: {e.last_exception}")

196

197

if attempt < max_retries:

198

delay = (2 ** attempt) + random.uniform(0, 1)

199

print(f"Retrying entire operation in {delay:.2f} seconds...")

200

time.sleep(delay)

201

fileobj.seek(0)

202

else:

203

raise

204

205

except Exception as e:

206

print(f"Unexpected error on attempt {attempt + 1}: {e}")

207

if attempt >= max_retries:

208

raise

209

210

# Use custom retry logic

211

client = boto3.client('s3')

212

transfer_manager = TransferManager(client)

213

214

try:

215

with open('/tmp/test_file.txt', 'rb') as f:

216

upload_with_custom_retry(transfer_manager, f, 'my-bucket', 'test_file.txt')

217

218

finally:

219

transfer_manager.shutdown()

220

```

221

222

### Handling Download Exceptions

223

224

```python

225

from s3transfer.exceptions import S3DownloadFailedError

226

import os

227

228

def safe_download(transfer_manager, bucket, key, filename):

229

"""Download with comprehensive error handling and cleanup."""

230

231

temp_filename = filename + '.tmp'

232

233

try:

234

# Download to temporary file first

235

with open(temp_filename, 'wb') as f:

236

future = transfer_manager.download(bucket, key, f)

237

result = future.result()

238

239

# Verify download completed successfully

240

if os.path.getsize(temp_filename) == 0:

241

raise S3DownloadFailedError("Downloaded file is empty")

242

243

# Move temporary file to final location

244

os.rename(temp_filename, filename)

245

print(f"Download completed: {filename}")

246

return True

247

248

except S3DownloadFailedError as e:

249

print(f"Download failed: {e}")

250

# Clean up temporary file

251

if os.path.exists(temp_filename):

252

os.remove(temp_filename)

253

return False

254

255

except TransferNotDoneError as e:

256

print(f"Download not complete: {e}")

257

if os.path.exists(temp_filename):

258

os.remove(temp_filename)

259

return False

260

261

except OSError as e:

262

print(f"File system error: {e}")

263

if os.path.exists(temp_filename):

264

os.remove(temp_filename)

265

return False

266

267

except Exception as e:

268

print(f"Unexpected download error: {e}")

269

if os.path.exists(temp_filename):

270

os.remove(temp_filename)

271

raise

272

273

# Use safe download

274

client = boto3.client('s3')

275

transfer_manager = TransferManager(client)

276

277

try:

278

success = safe_download(transfer_manager, 'my-bucket', 'test_file.txt', '/tmp/downloaded.txt')

279

if success:

280

print("Download completed successfully")

281

else:

282

print("Download failed")

283

284

finally:

285

transfer_manager.shutdown()

286

```

287

288

### Bandwidth Exception Handling

289

290

```python

291

from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket, RequestExceededException

292

import time

293

294

def handle_bandwidth_limited_operation():

295

"""Handle bandwidth-limited operations with proper exception handling."""

296

297

# Create very restrictive bandwidth limiter for demonstration

298

max_rate = 1024 # 1KB/s

299

leaky_bucket = LeakyBucket(max_rate)

300

bandwidth_limiter = BandwidthLimiter(leaky_bucket)

301

302

class TestStream:

303

def __init__(self, data):

304

self.data = data

305

self.position = 0

306

307

def read(self, amount=None):

308

if amount is None:

309

amount = len(self.data) - self.position

310

end = min(self.position + amount, len(self.data))

311

result = self.data[self.position:end]

312

self.position = end

313

return result

314

315

# Create bandwidth-limited stream

316

test_data = b'x' * 10240 # 10KB

317

stream = TestStream(test_data)

318

coordinator = TransferCoordinator()

319

320

limited_stream = bandwidth_limiter.get_bandwith_limited_stream(stream, coordinator)

321

322

total_read = 0

323

retries = 0

324

max_retries = 10

325

326

print("Starting bandwidth-limited read with exception handling...")

327

328

while total_read < len(test_data) and retries < max_retries:

329

try:

330

# Try to read data

331

chunk = limited_stream.read(2048) # Try to read 2KB

332

333

if chunk:

334

total_read += len(chunk)

335

print(f"Read {len(chunk)} bytes (total: {total_read})")

336

else:

337

break

338

339

except RequestExceededException as e:

340

retries += 1

341

print(f"Bandwidth limit exceeded: requested {e.requested_amt} bytes")

342

print(f"Can retry after: {e.retry_time}")

343

344

# Calculate wait time

345

wait_time = e.retry_time - time.time()

346

if wait_time > 0:

347

print(f"Waiting {wait_time:.2f} seconds...")

348

time.sleep(wait_time)

349

350

print(f"Retry attempt {retries}")

351

352

except Exception as e:

353

print(f"Unexpected bandwidth error: {e}")

354

break

355

356

if retries >= max_retries:

357

print("Maximum retries exceeded for bandwidth limiting")

358

else:

359

print(f"Completed reading {total_read} bytes with {retries} retries")

360

361

# Run bandwidth exception handling example

362

handle_bandwidth_limited_operation()

363

```

364

365

### Queue Exception Handling

366

367

```python

368

from s3transfer import ShutdownQueue, QueueShutdownError

369

import threading

370

import time

371

372

def demonstrate_queue_exception_handling():

373

"""Demonstrate handling of queue shutdown exceptions."""

374

375

# Create shutdown queue

376

queue = ShutdownQueue(maxsize=10)

377

378

def producer_thread():

379

"""Producer that handles queue shutdown gracefully."""

380

try:

381

for i in range(20):

382

try:

383

item = f"item_{i}"

384

queue.put(item, timeout=1)

385

print(f"Produced: {item}")

386

time.sleep(0.1)

387

388

except QueueShutdownError:

389

print("Producer: Queue has been shutdown, stopping production")

390

break

391

392

except Exception as e:

393

print(f"Producer error: {e}")

394

break

395

396

except Exception as e:

397

print(f"Producer thread error: {e}")

398

399

def consumer_thread():

400

"""Consumer that processes items until queue is shutdown."""

401

consumed_count = 0

402

403

try:

404

while True:

405

try:

406

item = queue.get(timeout=2)

407

print(f"Consumed: {item}")

408

consumed_count += 1

409

time.sleep(0.2)

410

411

except: # Queue empty or other error

412

print("Consumer: No more items or queue error")

413

break

414

415

except Exception as e:

416

print(f"Consumer thread error: {e}")

417

418

finally:

419

print(f"Consumer processed {consumed_count} items")

420

421

# Start producer and consumer threads

422

producer = threading.Thread(target=producer_thread)

423

consumer = threading.Thread(target=consumer_thread)

424

425

producer.start()

426

consumer.start()

427

428

# Let them run for a bit, then shutdown queue

429

time.sleep(1)

430

print("Triggering queue shutdown...")

431

queue.trigger_shutdown()

432

433

# Wait for threads to complete

434

producer.join()

435

consumer.join()

436

437

# Try to put item after shutdown (will raise exception)

438

try:

439

queue.put("post_shutdown_item")

440

except QueueShutdownError:

441

print("Correctly caught QueueShutdownError after shutdown")

442

443

# Run queue exception handling example

444

demonstrate_queue_exception_handling()

445

```

446

447

### Exception Logging and Monitoring

448

449

```python

450

import logging

451

import traceback

452

from datetime import datetime

453

from s3transfer.exceptions import *

454

455

class TransferExceptionHandler:

456

"""Centralized exception handler for transfer operations."""

457

458

def __init__(self, logger_name="s3transfer_exceptions"):

459

self.logger = logging.getLogger(logger_name)

460

self.exception_counts = {}

461

self.last_exception_time = {}

462

463

def handle_exception(self, exception, operation="unknown", **context):

464

"""Handle and log transfer exceptions with context."""

465

466

exception_type = type(exception).__name__

467

current_time = datetime.now()

468

469

# Update statistics

470

self.exception_counts[exception_type] = self.exception_counts.get(exception_type, 0) + 1

471

self.last_exception_time[exception_type] = current_time

472

473

# Create detailed log entry

474

log_data = {

475

'exception_type': exception_type,

476

'exception_message': str(exception),

477

'operation': operation,

478

'timestamp': current_time.isoformat(),

479

'count': self.exception_counts[exception_type],

480

**context

481

}

482

483

# Log based on exception type

484

if isinstance(exception, (S3UploadFailedError, S3DownloadFailedError)):

485

self.logger.error(f"S3 operation failed: {exception}", extra=log_data)

486

487

elif isinstance(exception, RetriesExceededError):

488

self.logger.error(f"Retries exhausted: {exception}", extra=log_data)

489

if hasattr(exception, 'last_exception') and exception.last_exception:

490

self.logger.error(f"Last retry exception: {exception.last_exception}")

491

492

elif isinstance(exception, TransferNotDoneError):

493

self.logger.warning(f"Premature access attempt: {exception}", extra=log_data)

494

495

elif isinstance(exception, FatalError):

496

self.logger.critical(f"Fatal transfer error: {exception}", extra=log_data)

497

498

elif isinstance(exception, RequestExceededException):

499

self.logger.info(f"Bandwidth limit exceeded: {exception}", extra=log_data)

500

501

else:

502

self.logger.error(f"Unexpected exception: {exception}", extra=log_data)

503

504

# Log stack trace for debugging

505

self.logger.debug("Stack trace:", exc_info=True)

506

507

return log_data

508

509

def get_exception_summary(self):

510

"""Get summary of handled exceptions."""

511

return {

512

'exception_counts': dict(self.exception_counts),

513

'last_exception_times': {

514

k: v.isoformat() for k, v in self.last_exception_time.items()

515

}

516

}

517

518

def should_retry(self, exception, attempt_count, max_attempts=3):

519

"""Determine if operation should be retried based on exception type."""

520

521

if attempt_count >= max_attempts:

522

return False

523

524

# Don't retry fatal errors

525

if isinstance(exception, FatalError):

526

return False

527

528

# Don't retry invalid operations

529

if isinstance(exception, (TransferNotDoneError, InvalidSubscriberMethodError)):

530

return False

531

532

# Retry network and S3 errors

533

if isinstance(exception, (S3UploadFailedError, S3DownloadFailedError, RetriesExceededError)):

534

return True

535

536

# Retry bandwidth limitations with delay

537

if isinstance(exception, RequestExceededException):

538

return True

539

540

# Conservative approach for unknown exceptions

541

return False

542

543

# Configure logging

544

logging.basicConfig(

545

level=logging.INFO,

546

format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'

547

)

548

549

# Use exception handler

550

exception_handler = TransferExceptionHandler()

551

552

def robust_transfer_operation(transfer_manager, operation_func, max_attempts=3):

553

"""Perform transfer operation with comprehensive exception handling."""

554

555

for attempt in range(max_attempts):

556

try:

557

result = operation_func()

558

return result

559

560

except Exception as e:

561

# Handle exception with context

562

context = {

563

'attempt': attempt + 1,

564

'max_attempts': max_attempts,

565

'operation_func': operation_func.__name__ if hasattr(operation_func, '__name__') else 'unknown'

566

}

567

568

exception_handler.handle_exception(e, "transfer_operation", **context)

569

570

# Decide whether to retry

571

if exception_handler.should_retry(e, attempt, max_attempts):

572

if attempt < max_attempts - 1:

573

delay = 2 ** attempt # Exponential backoff

574

print(f"Retrying in {delay} seconds (attempt {attempt + 2}/{max_attempts})")

575

time.sleep(delay)

576

continue

577

578

# Re-raise if not retrying or max attempts reached

579

raise e

580

581

# This shouldn't be reached, but just in case

582

raise Exception("Maximum attempts reached without success")

583

584

# Example usage

585

client = boto3.client('s3')

586

transfer_manager = TransferManager(client)

587

588

try:

589

def upload_operation():

590

with open('/tmp/test_file.txt', 'rb') as f:

591

future = transfer_manager.upload(f, 'my-bucket', 'test_file.txt')

592

return future.result()

593

594

# Perform operation with exception handling

595

result = robust_transfer_operation(transfer_manager, upload_operation)

596

print("Operation completed successfully!")

597

598

# Print exception summary

599

summary = exception_handler.get_exception_summary()

600

if summary['exception_counts']:

601

print("Exception summary:", summary)

602

603

except Exception as e:

604

print(f"Final failure: {e}")

605

606

finally:

607

transfer_manager.shutdown()

608

```

609

610

### Custom Exception Classes

611

612

```python

613

from s3transfer.exceptions import S3UploadFailedError

614

615

class CustomTransferError(Exception):

616

"""Base class for custom transfer exceptions."""

617

pass

618

619

class ValidationError(CustomTransferError):

620

"""Raised when transfer validation fails."""

621

622

def __init__(self, message, validation_type=None, expected=None, actual=None):

623

super().__init__(message)

624

self.validation_type = validation_type

625

self.expected = expected

626

self.actual = actual

627

628

class QuotaExceededError(CustomTransferError):

629

"""Raised when transfer quota is exceeded."""

630

631

def __init__(self, message, quota_type=None, limit=None, current=None):

632

super().__init__(message)

633

self.quota_type = quota_type

634

self.limit = limit

635

self.current = current

636

637

def validate_and_upload(transfer_manager, filename, bucket, key, max_size=None):

638

"""Upload with custom validation and exception handling."""

639

640

try:

641

# Validate file size

642

file_size = os.path.getsize(filename)

643

if max_size and file_size > max_size:

644

raise ValidationError(

645

f"File too large: {file_size} bytes",

646

validation_type="file_size",

647

expected=f"<= {max_size}",

648

actual=file_size

649

)

650

651

# Validate file exists and is readable

652

if not os.path.isfile(filename):

653

raise ValidationError(f"File not found: {filename}", validation_type="file_existence")

654

655

if not os.access(filename, os.R_OK):

656

raise ValidationError(f"File not readable: {filename}", validation_type="file_permissions")

657

658

# Perform upload

659

with open(filename, 'rb') as f:

660

future = transfer_manager.upload(f, bucket, key)

661

result = future.result()

662

663

print(f"Upload successful: {filename} -> s3://{bucket}/{key}")

664

return result

665

666

except ValidationError as e:

667

print(f"Validation failed: {e}")

668

if e.validation_type:

669

print(f" Type: {e.validation_type}")

670

if e.expected and e.actual:

671

print(f" Expected: {e.expected}, Actual: {e.actual}")

672

raise

673

674

except S3UploadFailedError as e:

675

print(f"S3 upload failed: {e}")

676

raise

677

678

except Exception as e:

679

print(f"Unexpected error: {e}")

680

raise

681

682

# Example usage with custom exceptions

683

try:

684

validate_and_upload(

685

transfer_manager,

686

'/tmp/test_file.txt',

687

'my-bucket',

688

'test_file.txt',

689

max_size=10 * 1024 * 1024 # 10MB limit

690

)

691

except ValidationError as e:

692

print(f"Validation error: {e}")

693

except Exception as e:

694

print(f"Other error: {e}")

695

```

696

697

## Exception Hierarchy

698

699

```

700

Exception

701

├── S3UploadFailedError

702

├── S3DownloadFailedError

703

├── RetriesExceededError

704

├── TransferNotDoneError

705

├── InvalidSubscriberMethodError

706

├── QueueShutdownError

707

├── RequestExceededException

708

└── CancelledError

709

└── FatalError

710

```

711

712

## Best Practices

713

714

### Exception Handling Strategy

715

716

1. **Catch specific exceptions**: Handle known exception types specifically rather than using broad except clauses

717

2. **Preserve exception context**: Use exception chaining to maintain original error information

718

3. **Implement proper cleanup**: Use try/finally or context managers for resource cleanup

719

4. **Log exceptions appropriately**: Include sufficient context for debugging

720

721

### Retry Logic

722

723

1. **Use exponential backoff**: Increase delay between retries to avoid overwhelming services

724

2. **Set maximum retry limits**: Prevent infinite retry loops

725

3. **Consider exception types**: Not all exceptions should trigger retries

726

4. **Add jitter**: Randomize retry timing to avoid thundering herd problems

727

728

### Resource Management

729

730

1. **Clean up on failure**: Remove partial files, close connections, etc.

731

2. **Handle shutdown gracefully**: Respond appropriately to shutdown signals

732

3. **Monitor resource usage**: Track and limit resource consumption

733

4. **Implement circuit breakers**: Stop operations when failure rates are high

734

735

### Monitoring and Debugging

736

737

1. **Log exception details**: Include operation context, timing, and parameters

738

2. **Track exception patterns**: Monitor exception frequency and types

739

3. **Use structured logging**: Make logs searchable and analyzable

740

4. **Implement alerting**: Notify operators of critical exceptions