An Amazon S3 Transfer Manager that provides high-level abstractions for efficient uploads/downloads with multipart transfers, progress callbacks, and retry logic.
—
Comprehensive exception classes for handling transfer failures, retry exhaustion, coordination errors, and other exceptional conditions in S3 transfer operations.
Core exceptions for S3 transfer operation failures with specific error contexts and recovery information.
class RetriesExceededError(Exception):
"""
Raised when maximum number of retries exceeded during transfer operations.
Args:
last_exception: The final exception that caused the retry failure
"""
def __init__(self, last_exception): ...
@property
def last_exception(self):
"""
The last exception that occurred before retries were exhausted.
Returns:
Exception: The final exception that caused failure
"""
class S3UploadFailedError(Exception):
"""
Raised when S3 upload operation fails.
Typically wraps underlying exceptions from S3 operations or network failures.
"""
class S3DownloadFailedError(Exception):
"""
Raised when S3 download operation fails.
Typically wraps underlying exceptions from S3 operations or network failures.
"""Exceptions related to transfer coordination, state management, and lifecycle issues.
class TransferNotDoneError(Exception):
"""
Raised when attempting transfer operations before completion.
Occurs when trying to access results or perform operations on transfers
that haven't finished executing.
"""
class FatalError(CancelledError):
"""
Fatal error in TransferManager that causes immediate shutdown.
Inherits from CancelledError to indicate that the transfer was cancelled
due to a fatal condition.
"""
class InvalidSubscriberMethodError(Exception):
"""
Raised when subscriber method is invalid or improperly implemented.
Args:
subscriber: The subscriber object with invalid method
method_name (str): Name of the invalid method
reason (str): Description of why the method is invalid
"""
def __init__(self, subscriber, method_name: str, reason: str): ...Exceptions related to queue operations and resource management.
class QueueShutdownError(Exception):
"""
Raised when attempting to put items in a shutdown queue.
Occurs when trying to add items to a ShutdownQueue after it has been
triggered for shutdown.
"""Exceptions specific to bandwidth management and rate limiting operations.
class RequestExceededException(Exception):
"""
Raised when bandwidth request exceeds available capacity.
Args:
requested_amt (int): Number of bytes that were requested
retry_time (float): Time when request can be retried
"""
def __init__(self, requested_amt: int, retry_time: float): ...
@property
def requested_amt(self) -> int:
"""Number of bytes that were requested."""
@property
def retry_time(self) -> float:
"""Time when request can be retried."""from s3transfer.manager import TransferManager
from s3transfer.exceptions import (
S3UploadFailedError, S3DownloadFailedError,
TransferNotDoneError, RetriesExceededError
)
import boto3
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
# Upload with comprehensive error handling
with open('/tmp/test_file.txt', 'rb') as f:
future = transfer_manager.upload(f, 'my-bucket', 'test_file.txt')
try:
# Wait for completion
result = future.result()
print("Upload completed successfully!")
except S3UploadFailedError as e:
print(f"Upload failed: {e}")
# Handle upload-specific failure
except TransferNotDoneError as e:
print(f"Transfer not complete: {e}")
# Handle premature access to results
except RetriesExceededError as e:
print(f"Retries exhausted: {e}")
print(f"Last exception: {e.last_exception}")
# Handle retry exhaustion
except Exception as e:
print(f"Unexpected error: {e}")
# Handle any other exceptions
finally:
transfer_manager.shutdown()import time
import random
from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError
def upload_with_custom_retry(transfer_manager, fileobj, bucket, key, max_retries=3):
"""Upload with custom retry logic and exception handling."""
for attempt in range(max_retries + 1):
try:
future = transfer_manager.upload(fileobj, bucket, key)
result = future.result()
print(f"Upload succeeded on attempt {attempt + 1}")
return result
except S3UploadFailedError as e:
print(f"Upload attempt {attempt + 1} failed: {e}")
if attempt < max_retries:
# Exponential backoff with jitter
delay = (2 ** attempt) + random.uniform(0, 1)
print(f"Retrying in {delay:.2f} seconds...")
time.sleep(delay)
# Reset file position for retry
fileobj.seek(0)
else:
print("All retry attempts exhausted")
raise
except RetriesExceededError as e:
print(f"Internal retries exhausted on attempt {attempt + 1}")
print(f"Last internal exception: {e.last_exception}")
if attempt < max_retries:
delay = (2 ** attempt) + random.uniform(0, 1)
print(f"Retrying entire operation in {delay:.2f} seconds...")
time.sleep(delay)
fileobj.seek(0)
else:
raise
except Exception as e:
print(f"Unexpected error on attempt {attempt + 1}: {e}")
if attempt >= max_retries:
raise
# Use custom retry logic
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
with open('/tmp/test_file.txt', 'rb') as f:
upload_with_custom_retry(transfer_manager, f, 'my-bucket', 'test_file.txt')
finally:
transfer_manager.shutdown()from s3transfer.exceptions import S3DownloadFailedError
import os
def safe_download(transfer_manager, bucket, key, filename):
"""Download with comprehensive error handling and cleanup."""
temp_filename = filename + '.tmp'
try:
# Download to temporary file first
with open(temp_filename, 'wb') as f:
future = transfer_manager.download(bucket, key, f)
result = future.result()
# Verify download completed successfully
if os.path.getsize(temp_filename) == 0:
raise S3DownloadFailedError("Downloaded file is empty")
# Move temporary file to final location
os.rename(temp_filename, filename)
print(f"Download completed: {filename}")
return True
except S3DownloadFailedError as e:
print(f"Download failed: {e}")
# Clean up temporary file
if os.path.exists(temp_filename):
os.remove(temp_filename)
return False
except TransferNotDoneError as e:
print(f"Download not complete: {e}")
if os.path.exists(temp_filename):
os.remove(temp_filename)
return False
except OSError as e:
print(f"File system error: {e}")
if os.path.exists(temp_filename):
os.remove(temp_filename)
return False
except Exception as e:
print(f"Unexpected download error: {e}")
if os.path.exists(temp_filename):
os.remove(temp_filename)
raise
# Use safe download
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
success = safe_download(transfer_manager, 'my-bucket', 'test_file.txt', '/tmp/downloaded.txt')
if success:
print("Download completed successfully")
else:
print("Download failed")
finally:
transfer_manager.shutdown()from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket, RequestExceededException
import time
def handle_bandwidth_limited_operation():
"""Handle bandwidth-limited operations with proper exception handling."""
# Create very restrictive bandwidth limiter for demonstration
max_rate = 1024 # 1KB/s
leaky_bucket = LeakyBucket(max_rate)
bandwidth_limiter = BandwidthLimiter(leaky_bucket)
class TestStream:
def __init__(self, data):
self.data = data
self.position = 0
def read(self, amount=None):
if amount is None:
amount = len(self.data) - self.position
end = min(self.position + amount, len(self.data))
result = self.data[self.position:end]
self.position = end
return result
# Create bandwidth-limited stream
test_data = b'x' * 10240 # 10KB
stream = TestStream(test_data)
coordinator = TransferCoordinator()
limited_stream = bandwidth_limiter.get_bandwith_limited_stream(stream, coordinator)
total_read = 0
retries = 0
max_retries = 10
print("Starting bandwidth-limited read with exception handling...")
while total_read < len(test_data) and retries < max_retries:
try:
# Try to read data
chunk = limited_stream.read(2048) # Try to read 2KB
if chunk:
total_read += len(chunk)
print(f"Read {len(chunk)} bytes (total: {total_read})")
else:
break
except RequestExceededException as e:
retries += 1
print(f"Bandwidth limit exceeded: requested {e.requested_amt} bytes")
print(f"Can retry after: {e.retry_time}")
# Calculate wait time
wait_time = e.retry_time - time.time()
if wait_time > 0:
print(f"Waiting {wait_time:.2f} seconds...")
time.sleep(wait_time)
print(f"Retry attempt {retries}")
except Exception as e:
print(f"Unexpected bandwidth error: {e}")
break
if retries >= max_retries:
print("Maximum retries exceeded for bandwidth limiting")
else:
print(f"Completed reading {total_read} bytes with {retries} retries")
# Run bandwidth exception handling example
handle_bandwidth_limited_operation()from s3transfer import ShutdownQueue, QueueShutdownError
import threading
import time
def demonstrate_queue_exception_handling():
"""Demonstrate handling of queue shutdown exceptions."""
# Create shutdown queue
queue = ShutdownQueue(maxsize=10)
def producer_thread():
"""Producer that handles queue shutdown gracefully."""
try:
for i in range(20):
try:
item = f"item_{i}"
queue.put(item, timeout=1)
print(f"Produced: {item}")
time.sleep(0.1)
except QueueShutdownError:
print("Producer: Queue has been shutdown, stopping production")
break
except Exception as e:
print(f"Producer error: {e}")
break
except Exception as e:
print(f"Producer thread error: {e}")
def consumer_thread():
"""Consumer that processes items until queue is shutdown."""
consumed_count = 0
try:
while True:
try:
item = queue.get(timeout=2)
print(f"Consumed: {item}")
consumed_count += 1
time.sleep(0.2)
except: # Queue empty or other error
print("Consumer: No more items or queue error")
break
except Exception as e:
print(f"Consumer thread error: {e}")
finally:
print(f"Consumer processed {consumed_count} items")
# Start producer and consumer threads
producer = threading.Thread(target=producer_thread)
consumer = threading.Thread(target=consumer_thread)
producer.start()
consumer.start()
# Let them run for a bit, then shutdown queue
time.sleep(1)
print("Triggering queue shutdown...")
queue.trigger_shutdown()
# Wait for threads to complete
producer.join()
consumer.join()
# Try to put item after shutdown (will raise exception)
try:
queue.put("post_shutdown_item")
except QueueShutdownError:
print("Correctly caught QueueShutdownError after shutdown")
# Run queue exception handling example
demonstrate_queue_exception_handling()import logging
import traceback
from datetime import datetime
from s3transfer.exceptions import *
class TransferExceptionHandler:
"""Centralized exception handler for transfer operations."""
def __init__(self, logger_name="s3transfer_exceptions"):
self.logger = logging.getLogger(logger_name)
self.exception_counts = {}
self.last_exception_time = {}
def handle_exception(self, exception, operation="unknown", **context):
"""Handle and log transfer exceptions with context."""
exception_type = type(exception).__name__
current_time = datetime.now()
# Update statistics
self.exception_counts[exception_type] = self.exception_counts.get(exception_type, 0) + 1
self.last_exception_time[exception_type] = current_time
# Create detailed log entry
log_data = {
'exception_type': exception_type,
'exception_message': str(exception),
'operation': operation,
'timestamp': current_time.isoformat(),
'count': self.exception_counts[exception_type],
**context
}
# Log based on exception type
if isinstance(exception, (S3UploadFailedError, S3DownloadFailedError)):
self.logger.error(f"S3 operation failed: {exception}", extra=log_data)
elif isinstance(exception, RetriesExceededError):
self.logger.error(f"Retries exhausted: {exception}", extra=log_data)
if hasattr(exception, 'last_exception') and exception.last_exception:
self.logger.error(f"Last retry exception: {exception.last_exception}")
elif isinstance(exception, TransferNotDoneError):
self.logger.warning(f"Premature access attempt: {exception}", extra=log_data)
elif isinstance(exception, FatalError):
self.logger.critical(f"Fatal transfer error: {exception}", extra=log_data)
elif isinstance(exception, RequestExceededException):
self.logger.info(f"Bandwidth limit exceeded: {exception}", extra=log_data)
else:
self.logger.error(f"Unexpected exception: {exception}", extra=log_data)
# Log stack trace for debugging
self.logger.debug("Stack trace:", exc_info=True)
return log_data
def get_exception_summary(self):
"""Get summary of handled exceptions."""
return {
'exception_counts': dict(self.exception_counts),
'last_exception_times': {
k: v.isoformat() for k, v in self.last_exception_time.items()
}
}
def should_retry(self, exception, attempt_count, max_attempts=3):
"""Determine if operation should be retried based on exception type."""
if attempt_count >= max_attempts:
return False
# Don't retry fatal errors
if isinstance(exception, FatalError):
return False
# Don't retry invalid operations
if isinstance(exception, (TransferNotDoneError, InvalidSubscriberMethodError)):
return False
# Retry network and S3 errors
if isinstance(exception, (S3UploadFailedError, S3DownloadFailedError, RetriesExceededError)):
return True
# Retry bandwidth limitations with delay
if isinstance(exception, RequestExceededException):
return True
# Conservative approach for unknown exceptions
return False
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Use exception handler
exception_handler = TransferExceptionHandler()
def robust_transfer_operation(transfer_manager, operation_func, max_attempts=3):
"""Perform transfer operation with comprehensive exception handling."""
for attempt in range(max_attempts):
try:
result = operation_func()
return result
except Exception as e:
# Handle exception with context
context = {
'attempt': attempt + 1,
'max_attempts': max_attempts,
'operation_func': operation_func.__name__ if hasattr(operation_func, '__name__') else 'unknown'
}
exception_handler.handle_exception(e, "transfer_operation", **context)
# Decide whether to retry
if exception_handler.should_retry(e, attempt, max_attempts):
if attempt < max_attempts - 1:
delay = 2 ** attempt # Exponential backoff
print(f"Retrying in {delay} seconds (attempt {attempt + 2}/{max_attempts})")
time.sleep(delay)
continue
# Re-raise if not retrying or max attempts reached
raise e
# This shouldn't be reached, but just in case
raise Exception("Maximum attempts reached without success")
# Example usage
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
def upload_operation():
with open('/tmp/test_file.txt', 'rb') as f:
future = transfer_manager.upload(f, 'my-bucket', 'test_file.txt')
return future.result()
# Perform operation with exception handling
result = robust_transfer_operation(transfer_manager, upload_operation)
print("Operation completed successfully!")
# Print exception summary
summary = exception_handler.get_exception_summary()
if summary['exception_counts']:
print("Exception summary:", summary)
except Exception as e:
print(f"Final failure: {e}")
finally:
transfer_manager.shutdown()from s3transfer.exceptions import S3UploadFailedError
class CustomTransferError(Exception):
"""Base class for custom transfer exceptions."""
pass
class ValidationError(CustomTransferError):
"""Raised when transfer validation fails."""
def __init__(self, message, validation_type=None, expected=None, actual=None):
super().__init__(message)
self.validation_type = validation_type
self.expected = expected
self.actual = actual
class QuotaExceededError(CustomTransferError):
"""Raised when transfer quota is exceeded."""
def __init__(self, message, quota_type=None, limit=None, current=None):
super().__init__(message)
self.quota_type = quota_type
self.limit = limit
self.current = current
def validate_and_upload(transfer_manager, filename, bucket, key, max_size=None):
"""Upload with custom validation and exception handling."""
try:
# Validate file size
file_size = os.path.getsize(filename)
if max_size and file_size > max_size:
raise ValidationError(
f"File too large: {file_size} bytes",
validation_type="file_size",
expected=f"<= {max_size}",
actual=file_size
)
# Validate file exists and is readable
if not os.path.isfile(filename):
raise ValidationError(f"File not found: {filename}", validation_type="file_existence")
if not os.access(filename, os.R_OK):
raise ValidationError(f"File not readable: {filename}", validation_type="file_permissions")
# Perform upload
with open(filename, 'rb') as f:
future = transfer_manager.upload(f, bucket, key)
result = future.result()
print(f"Upload successful: {filename} -> s3://{bucket}/{key}")
return result
except ValidationError as e:
print(f"Validation failed: {e}")
if e.validation_type:
print(f" Type: {e.validation_type}")
if e.expected and e.actual:
print(f" Expected: {e.expected}, Actual: {e.actual}")
raise
except S3UploadFailedError as e:
print(f"S3 upload failed: {e}")
raise
except Exception as e:
print(f"Unexpected error: {e}")
raise
# Example usage with custom exceptions
try:
validate_and_upload(
transfer_manager,
'/tmp/test_file.txt',
'my-bucket',
'test_file.txt',
max_size=10 * 1024 * 1024 # 10MB limit
)
except ValidationError as e:
print(f"Validation error: {e}")
except Exception as e:
print(f"Other error: {e}")Exception
├── S3UploadFailedError
├── S3DownloadFailedError
├── RetriesExceededError
├── TransferNotDoneError
├── InvalidSubscriberMethodError
├── QueueShutdownError
├── RequestExceededException
└── CancelledError
└── FatalErrorInstall with Tessl CLI
npx tessl i tessl/pypi-s3transfer