0
# Exception Handling
1
2
Comprehensive exception classes for handling transfer failures, retry exhaustion, coordination errors, and other exceptional conditions in S3 transfer operations.
3
4
## Capabilities
5
6
### Transfer Operation Exceptions
7
8
Core exceptions for S3 transfer operation failures with specific error contexts and recovery information.
9
10
```python { .api }
11
class RetriesExceededError(Exception):
12
"""
13
Raised when maximum number of retries exceeded during transfer operations.
14
15
Args:
16
last_exception: The final exception that caused the retry failure
17
"""
18
def __init__(self, last_exception): ...
19
20
@property
21
def last_exception(self):
22
"""
23
The last exception that occurred before retries were exhausted.
24
25
Returns:
26
Exception: The final exception that caused failure
27
"""
28
29
class S3UploadFailedError(Exception):
30
"""
31
Raised when S3 upload operation fails.
32
33
Typically wraps underlying exceptions from S3 operations or network failures.
34
"""
35
36
class S3DownloadFailedError(Exception):
37
"""
38
Raised when S3 download operation fails.
39
40
Typically wraps underlying exceptions from S3 operations or network failures.
41
"""
42
```
43
44
### Coordination and State Exceptions
45
46
Exceptions related to transfer coordination, state management, and lifecycle issues.
47
48
```python { .api }
49
class TransferNotDoneError(Exception):
50
"""
51
Raised when attempting transfer operations before completion.
52
53
Occurs when trying to access results or perform operations on transfers
54
that haven't finished executing.
55
"""
56
57
class FatalError(CancelledError):
58
"""
59
Fatal error in TransferManager that causes immediate shutdown.
60
61
Inherits from CancelledError to indicate that the transfer was cancelled
62
due to a fatal condition.
63
"""
64
65
class InvalidSubscriberMethodError(Exception):
66
"""
67
Raised when subscriber method is invalid or improperly implemented.
68
69
Args:
70
subscriber: The subscriber object with invalid method
71
method_name (str): Name of the invalid method
72
reason (str): Description of why the method is invalid
73
"""
74
def __init__(self, subscriber, method_name: str, reason: str): ...
75
```
76
77
### Queue and Resource Exceptions
78
79
Exceptions related to queue operations and resource management.
80
81
```python { .api }
82
class QueueShutdownError(Exception):
83
"""
84
Raised when attempting to put items in a shutdown queue.
85
86
Occurs when trying to add items to a ShutdownQueue after it has been
87
triggered for shutdown.
88
"""
89
```
90
91
### Bandwidth and Rate Limiting Exceptions
92
93
Exceptions specific to bandwidth management and rate limiting operations.
94
95
```python { .api }
96
class RequestExceededException(Exception):
97
"""
98
Raised when bandwidth request exceeds available capacity.
99
100
Args:
101
requested_amt (int): Number of bytes that were requested
102
retry_time (float): Time when request can be retried
103
"""
104
def __init__(self, requested_amt: int, retry_time: float): ...
105
106
@property
107
def requested_amt(self) -> int:
108
"""Number of bytes that were requested."""
109
110
@property
111
def retry_time(self) -> float:
112
"""Time when request can be retried."""
113
```
114
115
## Usage Examples
116
117
### Basic Exception Handling
118
119
```python
120
from s3transfer.manager import TransferManager
121
from s3transfer.exceptions import (
122
S3UploadFailedError, S3DownloadFailedError,
123
TransferNotDoneError, RetriesExceededError
124
)
125
import boto3
126
127
client = boto3.client('s3')
128
transfer_manager = TransferManager(client)
129
130
try:
131
# Upload with comprehensive error handling
132
with open('/tmp/test_file.txt', 'rb') as f:
133
future = transfer_manager.upload(f, 'my-bucket', 'test_file.txt')
134
135
try:
136
# Wait for completion
137
result = future.result()
138
print("Upload completed successfully!")
139
140
except S3UploadFailedError as e:
141
print(f"Upload failed: {e}")
142
# Handle upload-specific failure
143
144
except TransferNotDoneError as e:
145
print(f"Transfer not complete: {e}")
146
# Handle premature access to results
147
148
except RetriesExceededError as e:
149
print(f"Retries exhausted: {e}")
150
print(f"Last exception: {e.last_exception}")
151
# Handle retry exhaustion
152
153
except Exception as e:
154
print(f"Unexpected error: {e}")
155
# Handle any other exceptions
156
157
finally:
158
transfer_manager.shutdown()
159
```
160
161
### Retry Logic with Exception Handling
162
163
```python
164
import time
165
import random
166
from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError
167
168
def upload_with_custom_retry(transfer_manager, fileobj, bucket, key, max_retries=3):
169
"""Upload with custom retry logic and exception handling."""
170
171
for attempt in range(max_retries + 1):
172
try:
173
future = transfer_manager.upload(fileobj, bucket, key)
174
result = future.result()
175
print(f"Upload succeeded on attempt {attempt + 1}")
176
return result
177
178
except S3UploadFailedError as e:
179
print(f"Upload attempt {attempt + 1} failed: {e}")
180
181
if attempt < max_retries:
182
# Exponential backoff with jitter
183
delay = (2 ** attempt) + random.uniform(0, 1)
184
print(f"Retrying in {delay:.2f} seconds...")
185
time.sleep(delay)
186
187
# Reset file position for retry
188
fileobj.seek(0)
189
else:
190
print("All retry attempts exhausted")
191
raise
192
193
except RetriesExceededError as e:
194
print(f"Internal retries exhausted on attempt {attempt + 1}")
195
print(f"Last internal exception: {e.last_exception}")
196
197
if attempt < max_retries:
198
delay = (2 ** attempt) + random.uniform(0, 1)
199
print(f"Retrying entire operation in {delay:.2f} seconds...")
200
time.sleep(delay)
201
fileobj.seek(0)
202
else:
203
raise
204
205
except Exception as e:
206
print(f"Unexpected error on attempt {attempt + 1}: {e}")
207
if attempt >= max_retries:
208
raise
209
210
# Use custom retry logic
211
client = boto3.client('s3')
212
transfer_manager = TransferManager(client)
213
214
try:
215
with open('/tmp/test_file.txt', 'rb') as f:
216
upload_with_custom_retry(transfer_manager, f, 'my-bucket', 'test_file.txt')
217
218
finally:
219
transfer_manager.shutdown()
220
```
221
222
### Handling Download Exceptions
223
224
```python
225
from s3transfer.exceptions import S3DownloadFailedError
226
import os
227
228
def safe_download(transfer_manager, bucket, key, filename):
229
"""Download with comprehensive error handling and cleanup."""
230
231
temp_filename = filename + '.tmp'
232
233
try:
234
# Download to temporary file first
235
with open(temp_filename, 'wb') as f:
236
future = transfer_manager.download(bucket, key, f)
237
result = future.result()
238
239
# Verify download completed successfully
240
if os.path.getsize(temp_filename) == 0:
241
raise S3DownloadFailedError("Downloaded file is empty")
242
243
# Move temporary file to final location
244
os.rename(temp_filename, filename)
245
print(f"Download completed: {filename}")
246
return True
247
248
except S3DownloadFailedError as e:
249
print(f"Download failed: {e}")
250
# Clean up temporary file
251
if os.path.exists(temp_filename):
252
os.remove(temp_filename)
253
return False
254
255
except TransferNotDoneError as e:
256
print(f"Download not complete: {e}")
257
if os.path.exists(temp_filename):
258
os.remove(temp_filename)
259
return False
260
261
except OSError as e:
262
print(f"File system error: {e}")
263
if os.path.exists(temp_filename):
264
os.remove(temp_filename)
265
return False
266
267
except Exception as e:
268
print(f"Unexpected download error: {e}")
269
if os.path.exists(temp_filename):
270
os.remove(temp_filename)
271
raise
272
273
# Use safe download
274
client = boto3.client('s3')
275
transfer_manager = TransferManager(client)
276
277
try:
278
success = safe_download(transfer_manager, 'my-bucket', 'test_file.txt', '/tmp/downloaded.txt')
279
if success:
280
print("Download completed successfully")
281
else:
282
print("Download failed")
283
284
finally:
285
transfer_manager.shutdown()
286
```
287
288
### Bandwidth Exception Handling
289
290
```python
291
from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket, RequestExceededException
292
import time
293
294
def handle_bandwidth_limited_operation():
295
"""Handle bandwidth-limited operations with proper exception handling."""
296
297
# Create very restrictive bandwidth limiter for demonstration
298
max_rate = 1024 # 1KB/s
299
leaky_bucket = LeakyBucket(max_rate)
300
bandwidth_limiter = BandwidthLimiter(leaky_bucket)
301
302
class TestStream:
303
def __init__(self, data):
304
self.data = data
305
self.position = 0
306
307
def read(self, amount=None):
308
if amount is None:
309
amount = len(self.data) - self.position
310
end = min(self.position + amount, len(self.data))
311
result = self.data[self.position:end]
312
self.position = end
313
return result
314
315
# Create bandwidth-limited stream
316
test_data = b'x' * 10240 # 10KB
317
stream = TestStream(test_data)
318
coordinator = TransferCoordinator()
319
320
limited_stream = bandwidth_limiter.get_bandwith_limited_stream(stream, coordinator)
321
322
total_read = 0
323
retries = 0
324
max_retries = 10
325
326
print("Starting bandwidth-limited read with exception handling...")
327
328
while total_read < len(test_data) and retries < max_retries:
329
try:
330
# Try to read data
331
chunk = limited_stream.read(2048) # Try to read 2KB
332
333
if chunk:
334
total_read += len(chunk)
335
print(f"Read {len(chunk)} bytes (total: {total_read})")
336
else:
337
break
338
339
except RequestExceededException as e:
340
retries += 1
341
print(f"Bandwidth limit exceeded: requested {e.requested_amt} bytes")
342
print(f"Can retry after: {e.retry_time}")
343
344
# Calculate wait time
345
wait_time = e.retry_time - time.time()
346
if wait_time > 0:
347
print(f"Waiting {wait_time:.2f} seconds...")
348
time.sleep(wait_time)
349
350
print(f"Retry attempt {retries}")
351
352
except Exception as e:
353
print(f"Unexpected bandwidth error: {e}")
354
break
355
356
if retries >= max_retries:
357
print("Maximum retries exceeded for bandwidth limiting")
358
else:
359
print(f"Completed reading {total_read} bytes with {retries} retries")
360
361
# Run bandwidth exception handling example
362
handle_bandwidth_limited_operation()
363
```
364
365
### Queue Exception Handling
366
367
```python
368
from s3transfer import ShutdownQueue, QueueShutdownError
369
import threading
370
import time
371
372
def demonstrate_queue_exception_handling():
373
"""Demonstrate handling of queue shutdown exceptions."""
374
375
# Create shutdown queue
376
queue = ShutdownQueue(maxsize=10)
377
378
def producer_thread():
379
"""Producer that handles queue shutdown gracefully."""
380
try:
381
for i in range(20):
382
try:
383
item = f"item_{i}"
384
queue.put(item, timeout=1)
385
print(f"Produced: {item}")
386
time.sleep(0.1)
387
388
except QueueShutdownError:
389
print("Producer: Queue has been shutdown, stopping production")
390
break
391
392
except Exception as e:
393
print(f"Producer error: {e}")
394
break
395
396
except Exception as e:
397
print(f"Producer thread error: {e}")
398
399
def consumer_thread():
400
"""Consumer that processes items until queue is shutdown."""
401
consumed_count = 0
402
403
try:
404
while True:
405
try:
406
item = queue.get(timeout=2)
407
print(f"Consumed: {item}")
408
consumed_count += 1
409
time.sleep(0.2)
410
411
except: # Queue empty or other error
412
print("Consumer: No more items or queue error")
413
break
414
415
except Exception as e:
416
print(f"Consumer thread error: {e}")
417
418
finally:
419
print(f"Consumer processed {consumed_count} items")
420
421
# Start producer and consumer threads
422
producer = threading.Thread(target=producer_thread)
423
consumer = threading.Thread(target=consumer_thread)
424
425
producer.start()
426
consumer.start()
427
428
# Let them run for a bit, then shutdown queue
429
time.sleep(1)
430
print("Triggering queue shutdown...")
431
queue.trigger_shutdown()
432
433
# Wait for threads to complete
434
producer.join()
435
consumer.join()
436
437
# Try to put item after shutdown (will raise exception)
438
try:
439
queue.put("post_shutdown_item")
440
except QueueShutdownError:
441
print("Correctly caught QueueShutdownError after shutdown")
442
443
# Run queue exception handling example
444
demonstrate_queue_exception_handling()
445
```
446
447
### Exception Logging and Monitoring
448
449
```python
450
import logging
451
import traceback
452
from datetime import datetime
453
from s3transfer.exceptions import *
454
455
class TransferExceptionHandler:
456
"""Centralized exception handler for transfer operations."""
457
458
def __init__(self, logger_name="s3transfer_exceptions"):
459
self.logger = logging.getLogger(logger_name)
460
self.exception_counts = {}
461
self.last_exception_time = {}
462
463
def handle_exception(self, exception, operation="unknown", **context):
464
"""Handle and log transfer exceptions with context."""
465
466
exception_type = type(exception).__name__
467
current_time = datetime.now()
468
469
# Update statistics
470
self.exception_counts[exception_type] = self.exception_counts.get(exception_type, 0) + 1
471
self.last_exception_time[exception_type] = current_time
472
473
# Create detailed log entry
474
log_data = {
475
'exception_type': exception_type,
476
'exception_message': str(exception),
477
'operation': operation,
478
'timestamp': current_time.isoformat(),
479
'count': self.exception_counts[exception_type],
480
**context
481
}
482
483
# Log based on exception type
484
if isinstance(exception, (S3UploadFailedError, S3DownloadFailedError)):
485
self.logger.error(f"S3 operation failed: {exception}", extra=log_data)
486
487
elif isinstance(exception, RetriesExceededError):
488
self.logger.error(f"Retries exhausted: {exception}", extra=log_data)
489
if hasattr(exception, 'last_exception') and exception.last_exception:
490
self.logger.error(f"Last retry exception: {exception.last_exception}")
491
492
elif isinstance(exception, TransferNotDoneError):
493
self.logger.warning(f"Premature access attempt: {exception}", extra=log_data)
494
495
elif isinstance(exception, FatalError):
496
self.logger.critical(f"Fatal transfer error: {exception}", extra=log_data)
497
498
elif isinstance(exception, RequestExceededException):
499
self.logger.info(f"Bandwidth limit exceeded: {exception}", extra=log_data)
500
501
else:
502
self.logger.error(f"Unexpected exception: {exception}", extra=log_data)
503
504
# Log stack trace for debugging
505
self.logger.debug("Stack trace:", exc_info=True)
506
507
return log_data
508
509
def get_exception_summary(self):
510
"""Get summary of handled exceptions."""
511
return {
512
'exception_counts': dict(self.exception_counts),
513
'last_exception_times': {
514
k: v.isoformat() for k, v in self.last_exception_time.items()
515
}
516
}
517
518
def should_retry(self, exception, attempt_count, max_attempts=3):
519
"""Determine if operation should be retried based on exception type."""
520
521
if attempt_count >= max_attempts:
522
return False
523
524
# Don't retry fatal errors
525
if isinstance(exception, FatalError):
526
return False
527
528
# Don't retry invalid operations
529
if isinstance(exception, (TransferNotDoneError, InvalidSubscriberMethodError)):
530
return False
531
532
# Retry network and S3 errors
533
if isinstance(exception, (S3UploadFailedError, S3DownloadFailedError, RetriesExceededError)):
534
return True
535
536
# Retry bandwidth limitations with delay
537
if isinstance(exception, RequestExceededException):
538
return True
539
540
# Conservative approach for unknown exceptions
541
return False
542
543
# Configure logging
544
logging.basicConfig(
545
level=logging.INFO,
546
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
547
)
548
549
# Use exception handler
550
exception_handler = TransferExceptionHandler()
551
552
def robust_transfer_operation(transfer_manager, operation_func, max_attempts=3):
553
"""Perform transfer operation with comprehensive exception handling."""
554
555
for attempt in range(max_attempts):
556
try:
557
result = operation_func()
558
return result
559
560
except Exception as e:
561
# Handle exception with context
562
context = {
563
'attempt': attempt + 1,
564
'max_attempts': max_attempts,
565
'operation_func': operation_func.__name__ if hasattr(operation_func, '__name__') else 'unknown'
566
}
567
568
exception_handler.handle_exception(e, "transfer_operation", **context)
569
570
# Decide whether to retry
571
if exception_handler.should_retry(e, attempt, max_attempts):
572
if attempt < max_attempts - 1:
573
delay = 2 ** attempt # Exponential backoff
574
print(f"Retrying in {delay} seconds (attempt {attempt + 2}/{max_attempts})")
575
time.sleep(delay)
576
continue
577
578
# Re-raise if not retrying or max attempts reached
579
raise e
580
581
# This shouldn't be reached, but just in case
582
raise Exception("Maximum attempts reached without success")
583
584
# Example usage
585
client = boto3.client('s3')
586
transfer_manager = TransferManager(client)
587
588
try:
589
def upload_operation():
590
with open('/tmp/test_file.txt', 'rb') as f:
591
future = transfer_manager.upload(f, 'my-bucket', 'test_file.txt')
592
return future.result()
593
594
# Perform operation with exception handling
595
result = robust_transfer_operation(transfer_manager, upload_operation)
596
print("Operation completed successfully!")
597
598
# Print exception summary
599
summary = exception_handler.get_exception_summary()
600
if summary['exception_counts']:
601
print("Exception summary:", summary)
602
603
except Exception as e:
604
print(f"Final failure: {e}")
605
606
finally:
607
transfer_manager.shutdown()
608
```
609
610
### Custom Exception Classes
611
612
```python
613
from s3transfer.exceptions import S3UploadFailedError
614
615
class CustomTransferError(Exception):
616
"""Base class for custom transfer exceptions."""
617
pass
618
619
class ValidationError(CustomTransferError):
620
"""Raised when transfer validation fails."""
621
622
def __init__(self, message, validation_type=None, expected=None, actual=None):
623
super().__init__(message)
624
self.validation_type = validation_type
625
self.expected = expected
626
self.actual = actual
627
628
class QuotaExceededError(CustomTransferError):
629
"""Raised when transfer quota is exceeded."""
630
631
def __init__(self, message, quota_type=None, limit=None, current=None):
632
super().__init__(message)
633
self.quota_type = quota_type
634
self.limit = limit
635
self.current = current
636
637
def validate_and_upload(transfer_manager, filename, bucket, key, max_size=None):
638
"""Upload with custom validation and exception handling."""
639
640
try:
641
# Validate file size
642
file_size = os.path.getsize(filename)
643
if max_size and file_size > max_size:
644
raise ValidationError(
645
f"File too large: {file_size} bytes",
646
validation_type="file_size",
647
expected=f"<= {max_size}",
648
actual=file_size
649
)
650
651
# Validate file exists and is readable
652
if not os.path.isfile(filename):
653
raise ValidationError(f"File not found: {filename}", validation_type="file_existence")
654
655
if not os.access(filename, os.R_OK):
656
raise ValidationError(f"File not readable: {filename}", validation_type="file_permissions")
657
658
# Perform upload
659
with open(filename, 'rb') as f:
660
future = transfer_manager.upload(f, bucket, key)
661
result = future.result()
662
663
print(f"Upload successful: {filename} -> s3://{bucket}/{key}")
664
return result
665
666
except ValidationError as e:
667
print(f"Validation failed: {e}")
668
if e.validation_type:
669
print(f" Type: {e.validation_type}")
670
if e.expected and e.actual:
671
print(f" Expected: {e.expected}, Actual: {e.actual}")
672
raise
673
674
except S3UploadFailedError as e:
675
print(f"S3 upload failed: {e}")
676
raise
677
678
except Exception as e:
679
print(f"Unexpected error: {e}")
680
raise
681
682
# Example usage with custom exceptions
683
try:
684
validate_and_upload(
685
transfer_manager,
686
'/tmp/test_file.txt',
687
'my-bucket',
688
'test_file.txt',
689
max_size=10 * 1024 * 1024 # 10MB limit
690
)
691
except ValidationError as e:
692
print(f"Validation error: {e}")
693
except Exception as e:
694
print(f"Other error: {e}")
695
```
696
697
## Exception Hierarchy
698
699
```
700
Exception
701
├── S3UploadFailedError
702
├── S3DownloadFailedError
703
├── RetriesExceededError
704
├── TransferNotDoneError
705
├── InvalidSubscriberMethodError
706
├── QueueShutdownError
707
├── RequestExceededException
708
└── CancelledError
709
└── FatalError
710
```
711
712
## Best Practices
713
714
### Exception Handling Strategy
715
716
1. **Catch specific exceptions**: Handle known exception types specifically rather than using broad except clauses
717
2. **Preserve exception context**: Use exception chaining to maintain original error information
718
3. **Implement proper cleanup**: Use try/finally or context managers for resource cleanup
719
4. **Log exceptions appropriately**: Include sufficient context for debugging
720
721
### Retry Logic
722
723
1. **Use exponential backoff**: Increase delay between retries to avoid overwhelming services
724
2. **Set maximum retry limits**: Prevent infinite retry loops
725
3. **Consider exception types**: Not all exceptions should trigger retries
726
4. **Add jitter**: Randomize retry timing to avoid thundering herd problems
727
728
### Resource Management
729
730
1. **Clean up on failure**: Remove partial files, close connections, etc.
731
2. **Handle shutdown gracefully**: Respond appropriately to shutdown signals
732
3. **Monitor resource usage**: Track and limit resource consumption
733
4. **Implement circuit breakers**: Stop operations when failure rates are high
734
735
### Monitoring and Debugging
736
737
1. **Log exception details**: Include operation context, timing, and parameters
738
2. **Track exception patterns**: Monitor exception frequency and types
739
3. **Use structured logging**: Make logs searchable and analyzable
740
4. **Implement alerting**: Notify operators of critical exceptions