CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-s3transfer

An Amazon S3 Transfer Manager that provides high-level abstractions for efficient uploads/downloads with multipart transfers, progress callbacks, and retry logic.

Pending
Overview
Eval results
Files

transfer-manager.mddocs/

Modern Transfer Manager

The TransferManager class is the recommended interface for S3 operations, providing enhanced capabilities including upload/download/copy/delete operations, better resource management, future-based asynchronous execution, and comprehensive configuration options.

Capabilities

TransferManager Class

The main transfer management class supporting all S3 operations with asynchronous execution and comprehensive resource management.

class TransferManager:
    """
    Modern S3 transfer manager with enhanced capabilities.
    
    Args:
        client: boto3 S3 client instance
        config: TransferConfig for controlling transfer behavior
        osutil: OSUtils instance for file operations
        executor_cls: Executor class for managing threads/processes
    """
    def __init__(self, client, config=None, osutil=None, executor_cls=None): ...
    
    def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None) -> TransferFuture:
        """
        Upload a file-like object to S3.
        
        Args:
            fileobj: File-like object to upload (must support read())
            bucket (str): S3 bucket name
            key (str): S3 object key/name
            extra_args (dict, optional): Additional S3 operation arguments
            subscribers (list, optional): List of subscriber objects for events
        
        Returns:
            TransferFuture: Future object for tracking transfer progress
        
        Raises:
            ValueError: If extra_args contains invalid keys
        """
    
    def download(self, bucket, key, fileobj, extra_args=None, subscribers=None) -> TransferFuture:
        """
        Download an S3 object to a file-like object.
        
        Args:
            bucket (str): S3 bucket name
            key (str): S3 object key/name
            fileobj: File-like object to write to (must support write())
            extra_args (dict, optional): Additional S3 operation arguments
            subscribers (list, optional): List of subscriber objects for events
        
        Returns:
            TransferFuture: Future object for tracking transfer progress
        
        Raises:
            ValueError: If extra_args contains invalid keys
        """
    
    def copy(self, copy_source, bucket, key, extra_args=None, subscribers=None, source_client=None) -> TransferFuture:
        """
        Copy an S3 object to another location.
        
        Args:
            copy_source (dict): Source object specification {'Bucket': str, 'Key': str, 'VersionId': str}
            bucket (str): Destination bucket name
            key (str): Destination object key/name
            extra_args (dict, optional): Additional S3 operation arguments
            subscribers (list, optional): List of subscriber objects for events
            source_client: Separate S3 client for source operations
        
        Returns:
            TransferFuture: Future object for tracking transfer progress
        """
    
    def delete(self, bucket, key, extra_args=None, subscribers=None) -> TransferFuture:
        """
        Delete an S3 object.
        
        Args:
            bucket (str): S3 bucket name
            key (str): S3 object key/name
            extra_args (dict, optional): Additional S3 operation arguments
            subscribers (list, optional): List of subscriber objects for events
        
        Returns:
            TransferFuture: Future object for tracking transfer progress
        """
    
    def shutdown(self, cancel=False, cancel_msg=''):
        """
        Shutdown the transfer manager.
        
        Args:
            cancel (bool): Whether to cancel ongoing transfers
            cancel_msg (str): Message to include with cancellation
        """
    
    @property
    def client(self):
        """The boto3 S3 client instance."""
    
    @property
    def config(self) -> TransferConfig:
        """The transfer configuration."""
    
    # Class constants for allowed operation arguments
    ALLOWED_DOWNLOAD_ARGS: List[str]
    ALLOWED_UPLOAD_ARGS: List[str]
    ALLOWED_COPY_ARGS: List[str]
    ALLOWED_DELETE_ARGS: List[str]

TransferCoordinatorController

Controls and manages all transfer coordinators for a transfer manager, providing centralized coordination and lifecycle management.

class TransferCoordinatorController:
    """
    Controls all transfer coordinators for a manager.
    """
    def add_transfer_coordinator(self, transfer_coordinator):
        """
        Add a transfer coordinator to track.
        
        Args:
            transfer_coordinator: TransferCoordinator to manage
        """
    
    def remove_transfer_coordinator(self, transfer_coordinator):
        """
        Remove a transfer coordinator from tracking.
        
        Args:
            transfer_coordinator: TransferCoordinator to remove
        """
    
    def cancel(self, msg='', exc_type=None):
        """
        Cancel all tracked transfers.
        
        Args:
            msg (str): Cancellation message
            exc_type: Exception type for cancellation
        """
    
    def wait(self):
        """Wait for all transfers to complete."""
    
    @property
    def tracked_transfer_coordinators(self) -> Set:
        """Set of currently tracked transfer coordinators."""

Usage Examples

Basic Upload and Download

import boto3
from s3transfer.manager import TransferManager

# Create transfer manager
client = boto3.client('s3', region_name='us-west-2')
transfer_manager = TransferManager(client)

try:
    # Upload from file
    with open('/tmp/data.csv', 'rb') as f:
        future = transfer_manager.upload(f, 'my-bucket', 'data.csv')
        future.result()  # Wait for completion
    
    # Download to file
    with open('/tmp/downloaded.csv', 'wb') as f:
        future = transfer_manager.download('my-bucket', 'data.csv', f)
        future.result()  # Wait for completion

finally:
    # Always shutdown when done
    transfer_manager.shutdown()

Upload with Progress Tracking

from s3transfer.manager import TransferManager
from s3transfer.subscribers import BaseSubscriber

class ProgressSubscriber(BaseSubscriber):
    def __init__(self, filename):
        self._filename = filename
        self._size = None
        self._seen_so_far = 0
    
    def on_progress(self, bytes_transferred, **kwargs):
        self._seen_so_far += bytes_transferred
        if self._size:
            percentage = (self._seen_so_far / self._size) * 100
            print(f"\r{self._filename}: {percentage:.2f}% complete", end='')
    
    def on_done(self, **kwargs):
        print(f"\n{self._filename}: Transfer complete!")

# Upload with progress tracking
client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    progress_subscriber = ProgressSubscriber('/tmp/large_file.dat')
    
    with open('/tmp/large_file.dat', 'rb') as f:
        future = transfer_manager.upload(
            f, 'my-bucket', 'large_file.dat',
            subscribers=[progress_subscriber]
        )
        # Set file size for accurate progress
        future.meta.provide_transfer_size(os.path.getsize('/tmp/large_file.dat'))
        future.result()

finally:
    transfer_manager.shutdown()

Concurrent Operations

from s3transfer.manager import TransferManager
import concurrent.futures

client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    # Start multiple uploads concurrently
    upload_futures = []
    files_to_upload = ['/tmp/file1.txt', '/tmp/file2.txt', '/tmp/file3.txt']
    
    for filename in files_to_upload:
        with open(filename, 'rb') as f:
            future = transfer_manager.upload(f, 'my-bucket', os.path.basename(filename))
            upload_futures.append(future)
    
    # Wait for all uploads to complete
    for future in upload_futures:
        try:
            future.result()
            print(f"Upload completed for {future.meta.call_args.key}")
        except Exception as e:
            print(f"Upload failed: {e}")

finally:
    transfer_manager.shutdown()

Copy Operations

from s3transfer.manager import TransferManager

client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    # Copy within same account
    copy_source = {'Bucket': 'source-bucket', 'Key': 'source-file.txt'}
    future = transfer_manager.copy(
        copy_source, 'destination-bucket', 'destination-file.txt'
    )
    future.result()
    
    # Copy with version
    copy_source = {
        'Bucket': 'source-bucket', 
        'Key': 'source-file.txt',
        'VersionId': 'version-id-here'
    }
    future = transfer_manager.copy(
        copy_source, 'destination-bucket', 'versioned-copy.txt'
    )
    future.result()

finally:
    transfer_manager.shutdown()

Advanced Configuration

from s3transfer.manager import TransferManager, TransferConfig

# Create advanced configuration
config = TransferConfig(
    multipart_threshold=64 * 1024 * 1024,      # 64MB threshold
    multipart_chunksize=64 * 1024 * 1024,      # 64MB chunks
    max_request_concurrency=20,                 # 20 concurrent requests
    max_submission_concurrency=10,              # 10 concurrent submissions
    max_bandwidth=100 * 1024 * 1024,           # 100MB/s bandwidth limit
    num_download_attempts=10,                   # 10 retry attempts
    max_in_memory_upload_chunks=5,              # 5 chunks in memory
    max_in_memory_download_chunks=5             # 5 chunks in memory
)

client = boto3.client('s3')
transfer_manager = TransferManager(client, config)

try:
    # Large file operations will benefit from this configuration
    with open('/tmp/very_large_file.dat', 'rb') as f:
        future = transfer_manager.upload(f, 'my-bucket', 'very_large_file.dat')
        
        # Monitor progress
        while not future.done():
            time.sleep(1)
            if hasattr(future.meta, 'size') and future.meta.size:
                print(f"Transfer size: {future.meta.size} bytes")
        
        future.result()

finally:
    transfer_manager.shutdown()

Error Handling

from s3transfer.manager import TransferManager
from s3transfer.exceptions import S3UploadFailedError, TransferNotDoneError

client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    with open('/tmp/test_file.txt', 'rb') as f:
        future = transfer_manager.upload(f, 'my-bucket', 'test_file.txt')
        
        try:
            # Wait for completion with timeout
            result = future.result()
            print("Upload successful!")
            
        except S3UploadFailedError as e:
            print(f"Upload failed: {e}")
            
        except TransferNotDoneError as e:
            print(f"Transfer not complete: {e}")
            
        except Exception as e:
            print(f"Unexpected error: {e}")

finally:
    # Shutdown with cancellation if needed
    transfer_manager.shutdown(cancel=True, cancel_msg="Application shutting down")

Delete Operations

from s3transfer.manager import TransferManager

client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    # Simple delete
    future = transfer_manager.delete('my-bucket', 'file-to-delete.txt')
    future.result()
    
    # Delete with version
    future = transfer_manager.delete(
        'my-bucket', 'versioned-file.txt',
        extra_args={'VersionId': 'version-id-here'}
    )
    future.result()
    
    # Batch delete
    files_to_delete = ['file1.txt', 'file2.txt', 'file3.txt']
    delete_futures = []
    
    for filename in files_to_delete:
        future = transfer_manager.delete('my-bucket', filename)
        delete_futures.append(future)
    
    # Wait for all deletions
    for future in delete_futures:
        try:
            future.result()
            print(f"Deleted: {future.meta.call_args.key}")
        except Exception as e:
            print(f"Delete failed: {e}")

finally:
    transfer_manager.shutdown()

Allowed Extra Arguments

Upload Arguments

  • ACL: Access control list permissions
  • CacheControl: Cache control directives
  • ContentDisposition: Content disposition header
  • ContentEncoding: Content encoding (e.g., 'gzip')
  • ContentLanguage: Content language
  • ContentType: MIME type of the content
  • Expires: Expiration date
  • GrantFullControl: Full control permissions
  • GrantRead: Read permissions
  • GrantReadACP: Read ACP permissions
  • GrantWriteACL: Write ACL permissions
  • Metadata: User-defined metadata dictionary
  • RequestPayer: Request payer setting
  • ServerSideEncryption: Server-side encryption method
  • StorageClass: Storage class (STANDARD, REDUCED_REDUNDANCY, etc.)
  • SSECustomerAlgorithm: Customer-provided encryption algorithm
  • SSECustomerKey: Customer-provided encryption key
  • SSECustomerKeyMD5: MD5 hash of customer encryption key
  • SSEKMSKeyId: KMS key ID for encryption
  • SSEKMSEncryptionContext: KMS encryption context
  • Tagging: Object tags

Download Arguments

  • VersionId: Specific version of the object to download
  • SSECustomerAlgorithm: Customer-provided encryption algorithm
  • SSECustomerKey: Customer-provided encryption key
  • SSECustomerKeyMD5: MD5 hash of customer encryption key
  • RequestPayer: Request payer setting

Copy Arguments

  • All upload arguments plus:
  • MetadataDirective: Whether to copy or replace metadata
  • TaggingDirective: Whether to copy or replace tags
  • CopySourceIfMatch: Copy only if ETag matches
  • CopySourceIfModifiedSince: Copy only if modified since date
  • CopySourceIfNoneMatch: Copy only if ETag doesn't match
  • CopySourceIfUnmodifiedSince: Copy only if unmodified since date
  • CopySourceSSECustomerAlgorithm: Source encryption algorithm
  • CopySourceSSECustomerKey: Source encryption key
  • CopySourceSSECustomerKeyMD5: Source encryption key MD5

Delete Arguments

  • VersionId: Specific version to delete
  • MFA: MFA authentication for delete operations
  • RequestPayer: Request payer setting

Best Practices

  1. Always call shutdown(): Use try/finally or context managers to ensure cleanup
  2. Handle futures properly: Check future.done() and handle exceptions from future.result()
  3. Use appropriate configuration: Tune settings based on file sizes and network conditions
  4. Monitor progress: Use subscribers for long-running transfers
  5. Handle errors gracefully: Catch specific exceptions and implement retry logic where appropriate
  6. Resource management: Avoid creating multiple TransferManager instances unnecessarily

Install with Tessl CLI

npx tessl i tessl/pypi-s3transfer

docs

bandwidth-management.md

configuration.md

crt-support.md

exception-handling.md

file-utilities.md

futures-coordination.md

index.md

legacy-transfer.md

process-pool-downloads.md

subscribers-callbacks.md

transfer-manager.md

tile.json