CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-s3transfer

An Amazon S3 Transfer Manager that provides high-level abstractions for efficient uploads/downloads with multipart transfers, progress callbacks, and retry logic.

Pending
Overview
Eval results
Files

exception-handling.mddocs/

Exception Handling

Comprehensive exception classes for handling transfer failures, retry exhaustion, coordination errors, and other exceptional conditions in S3 transfer operations.

Capabilities

Transfer Operation Exceptions

Core exceptions for S3 transfer operation failures with specific error contexts and recovery information.

class RetriesExceededError(Exception):
    """
    Raised when maximum number of retries exceeded during transfer operations.
    
    Args:
        last_exception: The final exception that caused the retry failure
    """
    def __init__(self, last_exception): ...
    
    @property
    def last_exception(self):
        """
        The last exception that occurred before retries were exhausted.
        
        Returns:
            Exception: The final exception that caused failure
        """

class S3UploadFailedError(Exception):
    """
    Raised when S3 upload operation fails.
    
    Typically wraps underlying exceptions from S3 operations or network failures.
    """

class S3DownloadFailedError(Exception):
    """
    Raised when S3 download operation fails.
    
    Typically wraps underlying exceptions from S3 operations or network failures.
    """

Coordination and State Exceptions

Exceptions related to transfer coordination, state management, and lifecycle issues.

class TransferNotDoneError(Exception):
    """
    Raised when attempting transfer operations before completion.
    
    Occurs when trying to access results or perform operations on transfers
    that haven't finished executing.
    """

class FatalError(CancelledError):
    """
    Fatal error in TransferManager that causes immediate shutdown.
    
    Inherits from CancelledError to indicate that the transfer was cancelled
    due to a fatal condition.
    """

class InvalidSubscriberMethodError(Exception):
    """
    Raised when subscriber method is invalid or improperly implemented.
    
    Args:
        subscriber: The subscriber object with invalid method
        method_name (str): Name of the invalid method
        reason (str): Description of why the method is invalid
    """
    def __init__(self, subscriber, method_name: str, reason: str): ...

Queue and Resource Exceptions

Exceptions related to queue operations and resource management.

class QueueShutdownError(Exception):
    """
    Raised when attempting to put items in a shutdown queue.
    
    Occurs when trying to add items to a ShutdownQueue after it has been
    triggered for shutdown.
    """

Bandwidth and Rate Limiting Exceptions

Exceptions specific to bandwidth management and rate limiting operations.

class RequestExceededException(Exception):
    """
    Raised when bandwidth request exceeds available capacity.
    
    Args:
        requested_amt (int): Number of bytes that were requested
        retry_time (float): Time when request can be retried
    """
    def __init__(self, requested_amt: int, retry_time: float): ...
    
    @property
    def requested_amt(self) -> int:
        """Number of bytes that were requested."""
    
    @property
    def retry_time(self) -> float:
        """Time when request can be retried."""

Usage Examples

Basic Exception Handling

from s3transfer.manager import TransferManager
from s3transfer.exceptions import (
    S3UploadFailedError, S3DownloadFailedError, 
    TransferNotDoneError, RetriesExceededError
)
import boto3

client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    # Upload with comprehensive error handling
    with open('/tmp/test_file.txt', 'rb') as f:
        future = transfer_manager.upload(f, 'my-bucket', 'test_file.txt')
    
    try:
        # Wait for completion
        result = future.result()
        print("Upload completed successfully!")
        
    except S3UploadFailedError as e:
        print(f"Upload failed: {e}")
        # Handle upload-specific failure
        
    except TransferNotDoneError as e:
        print(f"Transfer not complete: {e}")
        # Handle premature access to results
        
    except RetriesExceededError as e:
        print(f"Retries exhausted: {e}")
        print(f"Last exception: {e.last_exception}")
        # Handle retry exhaustion
        
    except Exception as e:
        print(f"Unexpected error: {e}")
        # Handle any other exceptions

finally:
    transfer_manager.shutdown()

Retry Logic with Exception Handling

import time
import random
from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError

def upload_with_custom_retry(transfer_manager, fileobj, bucket, key, max_retries=3):
    """Upload with custom retry logic and exception handling."""
    
    for attempt in range(max_retries + 1):
        try:
            future = transfer_manager.upload(fileobj, bucket, key)
            result = future.result()
            print(f"Upload succeeded on attempt {attempt + 1}")
            return result
            
        except S3UploadFailedError as e:
            print(f"Upload attempt {attempt + 1} failed: {e}")
            
            if attempt < max_retries:
                # Exponential backoff with jitter
                delay = (2 ** attempt) + random.uniform(0, 1)
                print(f"Retrying in {delay:.2f} seconds...")
                time.sleep(delay)
                
                # Reset file position for retry
                fileobj.seek(0)
            else:
                print("All retry attempts exhausted")
                raise
                
        except RetriesExceededError as e:
            print(f"Internal retries exhausted on attempt {attempt + 1}")
            print(f"Last internal exception: {e.last_exception}")
            
            if attempt < max_retries:
                delay = (2 ** attempt) + random.uniform(0, 1)
                print(f"Retrying entire operation in {delay:.2f} seconds...")
                time.sleep(delay)
                fileobj.seek(0)
            else:
                raise
                
        except Exception as e:
            print(f"Unexpected error on attempt {attempt + 1}: {e}")
            if attempt >= max_retries:
                raise

# Use custom retry logic
client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    with open('/tmp/test_file.txt', 'rb') as f:
        upload_with_custom_retry(transfer_manager, f, 'my-bucket', 'test_file.txt')

finally:
    transfer_manager.shutdown()

Handling Download Exceptions

from s3transfer.exceptions import S3DownloadFailedError
import os

def safe_download(transfer_manager, bucket, key, filename):
    """Download with comprehensive error handling and cleanup."""
    
    temp_filename = filename + '.tmp'
    
    try:
        # Download to temporary file first
        with open(temp_filename, 'wb') as f:
            future = transfer_manager.download(bucket, key, f)
            result = future.result()
        
        # Verify download completed successfully
        if os.path.getsize(temp_filename) == 0:
            raise S3DownloadFailedError("Downloaded file is empty")
        
        # Move temporary file to final location
        os.rename(temp_filename, filename)
        print(f"Download completed: {filename}")
        return True
        
    except S3DownloadFailedError as e:
        print(f"Download failed: {e}")
        # Clean up temporary file
        if os.path.exists(temp_filename):
            os.remove(temp_filename)
        return False
        
    except TransferNotDoneError as e:
        print(f"Download not complete: {e}")
        if os.path.exists(temp_filename):
            os.remove(temp_filename)
        return False
        
    except OSError as e:
        print(f"File system error: {e}")
        if os.path.exists(temp_filename):
            os.remove(temp_filename)
        return False
        
    except Exception as e:
        print(f"Unexpected download error: {e}")
        if os.path.exists(temp_filename):
            os.remove(temp_filename)
        raise

# Use safe download
client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    success = safe_download(transfer_manager, 'my-bucket', 'test_file.txt', '/tmp/downloaded.txt')
    if success:
        print("Download completed successfully")
    else:
        print("Download failed")

finally:
    transfer_manager.shutdown()

Bandwidth Exception Handling

from s3transfer.bandwidth import BandwidthLimiter, LeakyBucket, RequestExceededException
import time

def handle_bandwidth_limited_operation():
    """Handle bandwidth-limited operations with proper exception handling."""
    
    # Create very restrictive bandwidth limiter for demonstration
    max_rate = 1024  # 1KB/s
    leaky_bucket = LeakyBucket(max_rate)
    bandwidth_limiter = BandwidthLimiter(leaky_bucket)
    
    class TestStream:
        def __init__(self, data):
            self.data = data
            self.position = 0
        
        def read(self, amount=None):
            if amount is None:
                amount = len(self.data) - self.position
            end = min(self.position + amount, len(self.data))
            result = self.data[self.position:end]
            self.position = end
            return result
    
    # Create bandwidth-limited stream
    test_data = b'x' * 10240  # 10KB
    stream = TestStream(test_data)
    coordinator = TransferCoordinator()
    
    limited_stream = bandwidth_limiter.get_bandwith_limited_stream(stream, coordinator)
    
    total_read = 0
    retries = 0
    max_retries = 10
    
    print("Starting bandwidth-limited read with exception handling...")
    
    while total_read < len(test_data) and retries < max_retries:
        try:
            # Try to read data
            chunk = limited_stream.read(2048)  # Try to read 2KB
            
            if chunk:
                total_read += len(chunk)
                print(f"Read {len(chunk)} bytes (total: {total_read})")
            else:
                break
                
        except RequestExceededException as e:
            retries += 1
            print(f"Bandwidth limit exceeded: requested {e.requested_amt} bytes")
            print(f"Can retry after: {e.retry_time}")
            
            # Calculate wait time
            wait_time = e.retry_time - time.time()
            if wait_time > 0:
                print(f"Waiting {wait_time:.2f} seconds...")
                time.sleep(wait_time)
            
            print(f"Retry attempt {retries}")
            
        except Exception as e:
            print(f"Unexpected bandwidth error: {e}")
            break
    
    if retries >= max_retries:
        print("Maximum retries exceeded for bandwidth limiting")
    else:
        print(f"Completed reading {total_read} bytes with {retries} retries")

# Run bandwidth exception handling example
handle_bandwidth_limited_operation()

Queue Exception Handling

from s3transfer import ShutdownQueue, QueueShutdownError
import threading
import time

def demonstrate_queue_exception_handling():
    """Demonstrate handling of queue shutdown exceptions."""
    
    # Create shutdown queue
    queue = ShutdownQueue(maxsize=10)
    
    def producer_thread():
        """Producer that handles queue shutdown gracefully."""
        try:
            for i in range(20):
                try:
                    item = f"item_{i}"
                    queue.put(item, timeout=1)
                    print(f"Produced: {item}")
                    time.sleep(0.1)
                    
                except QueueShutdownError:
                    print("Producer: Queue has been shutdown, stopping production")
                    break
                    
                except Exception as e:
                    print(f"Producer error: {e}")
                    break
        
        except Exception as e:
            print(f"Producer thread error: {e}")
    
    def consumer_thread():
        """Consumer that processes items until queue is shutdown."""
        consumed_count = 0
        
        try:
            while True:
                try:
                    item = queue.get(timeout=2)
                    print(f"Consumed: {item}")
                    consumed_count += 1
                    time.sleep(0.2)
                    
                except:  # Queue empty or other error
                    print("Consumer: No more items or queue error")
                    break
                    
        except Exception as e:
            print(f"Consumer thread error: {e}")
        
        finally:
            print(f"Consumer processed {consumed_count} items")
    
    # Start producer and consumer threads
    producer = threading.Thread(target=producer_thread)
    consumer = threading.Thread(target=consumer_thread)
    
    producer.start()
    consumer.start()
    
    # Let them run for a bit, then shutdown queue
    time.sleep(1)
    print("Triggering queue shutdown...")
    queue.trigger_shutdown()
    
    # Wait for threads to complete
    producer.join()
    consumer.join()
    
    # Try to put item after shutdown (will raise exception)
    try:
        queue.put("post_shutdown_item")
    except QueueShutdownError:
        print("Correctly caught QueueShutdownError after shutdown")

# Run queue exception handling example
demonstrate_queue_exception_handling()

Exception Logging and Monitoring

import logging
import traceback
from datetime import datetime
from s3transfer.exceptions import *

class TransferExceptionHandler:
    """Centralized exception handler for transfer operations."""
    
    def __init__(self, logger_name="s3transfer_exceptions"):
        self.logger = logging.getLogger(logger_name)
        self.exception_counts = {}
        self.last_exception_time = {}
    
    def handle_exception(self, exception, operation="unknown", **context):
        """Handle and log transfer exceptions with context."""
        
        exception_type = type(exception).__name__
        current_time = datetime.now()
        
        # Update statistics
        self.exception_counts[exception_type] = self.exception_counts.get(exception_type, 0) + 1
        self.last_exception_time[exception_type] = current_time
        
        # Create detailed log entry
        log_data = {
            'exception_type': exception_type,
            'exception_message': str(exception),
            'operation': operation,
            'timestamp': current_time.isoformat(),
            'count': self.exception_counts[exception_type],
            **context
        }
        
        # Log based on exception type
        if isinstance(exception, (S3UploadFailedError, S3DownloadFailedError)):
            self.logger.error(f"S3 operation failed: {exception}", extra=log_data)
            
        elif isinstance(exception, RetriesExceededError):
            self.logger.error(f"Retries exhausted: {exception}", extra=log_data)
            if hasattr(exception, 'last_exception') and exception.last_exception:
                self.logger.error(f"Last retry exception: {exception.last_exception}")
                
        elif isinstance(exception, TransferNotDoneError):
            self.logger.warning(f"Premature access attempt: {exception}", extra=log_data)
            
        elif isinstance(exception, FatalError):
            self.logger.critical(f"Fatal transfer error: {exception}", extra=log_data)
            
        elif isinstance(exception, RequestExceededException):
            self.logger.info(f"Bandwidth limit exceeded: {exception}", extra=log_data)
            
        else:
            self.logger.error(f"Unexpected exception: {exception}", extra=log_data)
        
        # Log stack trace for debugging
        self.logger.debug("Stack trace:", exc_info=True)
        
        return log_data
    
    def get_exception_summary(self):
        """Get summary of handled exceptions."""
        return {
            'exception_counts': dict(self.exception_counts),
            'last_exception_times': {
                k: v.isoformat() for k, v in self.last_exception_time.items()
            }
        }
    
    def should_retry(self, exception, attempt_count, max_attempts=3):
        """Determine if operation should be retried based on exception type."""
        
        if attempt_count >= max_attempts:
            return False
        
        # Don't retry fatal errors
        if isinstance(exception, FatalError):
            return False
        
        # Don't retry invalid operations
        if isinstance(exception, (TransferNotDoneError, InvalidSubscriberMethodError)):
            return False
        
        # Retry network and S3 errors
        if isinstance(exception, (S3UploadFailedError, S3DownloadFailedError, RetriesExceededError)):
            return True
        
        # Retry bandwidth limitations with delay
        if isinstance(exception, RequestExceededException):
            return True
        
        # Conservative approach for unknown exceptions
        return False

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Use exception handler
exception_handler = TransferExceptionHandler()

def robust_transfer_operation(transfer_manager, operation_func, max_attempts=3):
    """Perform transfer operation with comprehensive exception handling."""
    
    for attempt in range(max_attempts):
        try:
            result = operation_func()
            return result
            
        except Exception as e:
            # Handle exception with context
            context = {
                'attempt': attempt + 1,
                'max_attempts': max_attempts,
                'operation_func': operation_func.__name__ if hasattr(operation_func, '__name__') else 'unknown'
            }
            
            exception_handler.handle_exception(e, "transfer_operation", **context)
            
            # Decide whether to retry
            if exception_handler.should_retry(e, attempt, max_attempts):
                if attempt < max_attempts - 1:
                    delay = 2 ** attempt  # Exponential backoff
                    print(f"Retrying in {delay} seconds (attempt {attempt + 2}/{max_attempts})")
                    time.sleep(delay)
                    continue
            
            # Re-raise if not retrying or max attempts reached
            raise e
    
    # This shouldn't be reached, but just in case
    raise Exception("Maximum attempts reached without success")

# Example usage
client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    def upload_operation():
        with open('/tmp/test_file.txt', 'rb') as f:
            future = transfer_manager.upload(f, 'my-bucket', 'test_file.txt')
            return future.result()
    
    # Perform operation with exception handling
    result = robust_transfer_operation(transfer_manager, upload_operation)
    print("Operation completed successfully!")
    
    # Print exception summary
    summary = exception_handler.get_exception_summary()
    if summary['exception_counts']:
        print("Exception summary:", summary)

except Exception as e:
    print(f"Final failure: {e}")

finally:
    transfer_manager.shutdown()

Custom Exception Classes

from s3transfer.exceptions import S3UploadFailedError

class CustomTransferError(Exception):
    """Base class for custom transfer exceptions."""
    pass

class ValidationError(CustomTransferError):
    """Raised when transfer validation fails."""
    
    def __init__(self, message, validation_type=None, expected=None, actual=None):
        super().__init__(message)
        self.validation_type = validation_type
        self.expected = expected
        self.actual = actual

class QuotaExceededError(CustomTransferError):
    """Raised when transfer quota is exceeded."""
    
    def __init__(self, message, quota_type=None, limit=None, current=None):
        super().__init__(message)
        self.quota_type = quota_type
        self.limit = limit
        self.current = current

def validate_and_upload(transfer_manager, filename, bucket, key, max_size=None):
    """Upload with custom validation and exception handling."""
    
    try:
        # Validate file size
        file_size = os.path.getsize(filename)
        if max_size and file_size > max_size:
            raise ValidationError(
                f"File too large: {file_size} bytes",
                validation_type="file_size",
                expected=f"<= {max_size}",
                actual=file_size
            )
        
        # Validate file exists and is readable
        if not os.path.isfile(filename):
            raise ValidationError(f"File not found: {filename}", validation_type="file_existence")
        
        if not os.access(filename, os.R_OK):
            raise ValidationError(f"File not readable: {filename}", validation_type="file_permissions")
        
        # Perform upload
        with open(filename, 'rb') as f:
            future = transfer_manager.upload(f, bucket, key)
            result = future.result()
        
        print(f"Upload successful: {filename} -> s3://{bucket}/{key}")
        return result
        
    except ValidationError as e:
        print(f"Validation failed: {e}")
        if e.validation_type:
            print(f"  Type: {e.validation_type}")
        if e.expected and e.actual:
            print(f"  Expected: {e.expected}, Actual: {e.actual}")
        raise
        
    except S3UploadFailedError as e:
        print(f"S3 upload failed: {e}")
        raise
        
    except Exception as e:
        print(f"Unexpected error: {e}")
        raise

# Example usage with custom exceptions
try:
    validate_and_upload(
        transfer_manager, 
        '/tmp/test_file.txt', 
        'my-bucket', 
        'test_file.txt',
        max_size=10 * 1024 * 1024  # 10MB limit
    )
except ValidationError as e:
    print(f"Validation error: {e}")
except Exception as e:
    print(f"Other error: {e}")

Exception Hierarchy

Exception
├── S3UploadFailedError
├── S3DownloadFailedError
├── RetriesExceededError
├── TransferNotDoneError
├── InvalidSubscriberMethodError
├── QueueShutdownError
├── RequestExceededException
└── CancelledError
    └── FatalError

Best Practices

Exception Handling Strategy

  1. Catch specific exceptions: Handle known exception types specifically rather than using broad except clauses
  2. Preserve exception context: Use exception chaining to maintain original error information
  3. Implement proper cleanup: Use try/finally or context managers for resource cleanup
  4. Log exceptions appropriately: Include sufficient context for debugging

Retry Logic

  1. Use exponential backoff: Increase delay between retries to avoid overwhelming services
  2. Set maximum retry limits: Prevent infinite retry loops
  3. Consider exception types: Not all exceptions should trigger retries
  4. Add jitter: Randomize retry timing to avoid thundering herd problems

Resource Management

  1. Clean up on failure: Remove partial files, close connections, etc.
  2. Handle shutdown gracefully: Respond appropriately to shutdown signals
  3. Monitor resource usage: Track and limit resource consumption
  4. Implement circuit breakers: Stop operations when failure rates are high

Monitoring and Debugging

  1. Log exception details: Include operation context, timing, and parameters
  2. Track exception patterns: Monitor exception frequency and types
  3. Use structured logging: Make logs searchable and analyzable
  4. Implement alerting: Notify operators of critical exceptions

Install with Tessl CLI

npx tessl i tessl/pypi-s3transfer

docs

bandwidth-management.md

configuration.md

crt-support.md

exception-handling.md

file-utilities.md

futures-coordination.md

index.md

legacy-transfer.md

process-pool-downloads.md

subscribers-callbacks.md

transfer-manager.md

tile.json