or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

bandwidth-management.mdconfiguration.mdcrt-support.mdexception-handling.mdfile-utilities.mdfutures-coordination.mdindex.mdlegacy-transfer.mdprocess-pool-downloads.mdsubscribers-callbacks.mdtransfer-manager.md
tile.json

tessl/pypi-s3transfer

An Amazon S3 Transfer Manager that provides high-level abstractions for efficient uploads/downloads with multipart transfers, progress callbacks, and retry logic.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/s3transfer@0.13.x

To install, run

npx @tessl/cli install tessl/pypi-s3transfer@0.13.0

index.mddocs/

S3Transfer

A Python library that provides high-level abstractions for efficient Amazon S3 uploads and downloads. S3Transfer handles multipart operations, parallel processing, bandwidth throttling, progress callbacks, and retry logic, making it the foundational transfer layer for boto3's S3 operations.

Package Information

  • Package Name: s3transfer
  • Language: Python
  • Installation: pip install s3transfer

Core Imports

import s3transfer
from s3transfer import S3Transfer, TransferConfig

Modern API (recommended):

from s3transfer.manager import TransferManager, TransferConfig

Basic Usage

Legacy API

import boto3
from s3transfer import S3Transfer, TransferConfig

# Create S3 client and transfer manager
client = boto3.client('s3', region_name='us-west-2')
transfer = S3Transfer(client)

# Upload a file
transfer.upload_file('/tmp/myfile.txt', 'my-bucket', 'myfile.txt')

# Download a file
transfer.download_file('my-bucket', 'myfile.txt', '/tmp/downloaded.txt')

# With configuration
config = TransferConfig(
    multipart_threshold=8 * 1024 * 1024,  # 8MB
    max_concurrency=10,
    num_download_attempts=5
)
transfer = S3Transfer(client, config)

Modern API

import boto3
from s3transfer.manager import TransferManager, TransferConfig

# Create transfer manager
client = boto3.client('s3', region_name='us-west-2')
config = TransferConfig(
    multipart_threshold=8 * 1024 * 1024,
    max_request_concurrency=10,
    max_bandwidth=100 * 1024 * 1024  # 100MB/s
)
transfer_manager = TransferManager(client, config)

# Upload with progress tracking
with open('/tmp/myfile.txt', 'rb') as f:
    future = transfer_manager.upload(f, 'my-bucket', 'myfile.txt')
    future.result()  # Wait for completion

# Download
with open('/tmp/downloaded.txt', 'wb') as f:
    future = transfer_manager.download('my-bucket', 'myfile.txt', f)
    future.result()

# Always shutdown when done
transfer_manager.shutdown()

Architecture

S3Transfer is built around a two-tier API design:

  • High-level interfaces (S3Transfer, TransferManager): Simple methods for common operations
  • Low-level components (futures, coordinators, tasks): Fine-grained control and advanced features
  • Transfer coordination: Future-based asynchronous execution with progress tracking
  • Bandwidth management: Token bucket algorithms for transfer rate limiting
  • Error handling: Comprehensive retry logic with exponential backoff
  • Multipart operations: Automatic multipart uploads/downloads for large files

The modern TransferManager provides enhanced capabilities including better resource management, more flexible configuration, and improved progress tracking compared to the legacy S3Transfer class.

Capabilities

Legacy Transfer Interface

The original S3Transfer class providing simple upload and download operations with basic configuration and progress callbacks.

class S3Transfer:
    def __init__(self, client, config=None, osutil=None): ...
    def upload_file(self, filename, bucket, key, callback=None, extra_args=None): ...
    def download_file(self, bucket, key, filename, extra_args=None, callback=None): ...

Legacy Transfer Interface

Modern Transfer Manager

The recommended TransferManager class offering enhanced capabilities including upload/download/copy/delete operations, better resource management, and comprehensive configuration options.

class TransferManager:
    def __init__(self, client, config=None, osutil=None, executor_cls=None): ...
    def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None): ...
    def download(self, bucket, key, fileobj, extra_args=None, subscribers=None): ...
    def copy(self, copy_source, bucket, key, extra_args=None, subscribers=None, source_client=None): ...
    def delete(self, bucket, key, extra_args=None, subscribers=None): ...
    def shutdown(self, cancel=False, cancel_msg=''): ...

Modern Transfer Manager

Configuration Management

Comprehensive configuration classes for controlling transfer behavior including thresholds, concurrency, retry settings, and bandwidth limits.

class TransferConfig:
    def __init__(self, multipart_threshold=8388608, max_concurrency=10, multipart_chunksize=8388608, num_download_attempts=5, max_io_queue=100): ...

class TransferConfig:  # Modern version
    def __init__(self, multipart_threshold=8388608, multipart_chunksize=8388608, max_request_concurrency=10, max_submission_concurrency=5, max_request_queue_size=1024, max_submission_queue_size=1024, max_io_queue_size=1024, io_chunksize=262144, num_download_attempts=5, max_in_memory_upload_chunks=10, max_in_memory_download_chunks=10, max_bandwidth=None): ...

Configuration Management

Future-based Coordination

Asynchronous transfer execution using futures, coordinators, and metadata tracking for monitoring transfer progress and handling completion.

class TransferFuture:
    def done(self) -> bool: ...
    def result(self): ...
    def cancel(self): ...
    @property
    def meta(self) -> TransferMeta: ...

class TransferMeta:
    @property
    def call_args(self): ...
    @property
    def transfer_id(self): ...
    @property
    def size(self): ...

Future-based Coordination

File Utilities and Progress Tracking

File handling utilities including chunk readers, progress streams, and OS operations with callback support for monitoring transfer progress.

class ReadFileChunk:
    def __init__(self, fileobj, start_byte, chunk_size, full_file_size, callback=None, enable_callback=True): ...
    @classmethod
    def from_filename(cls, filename, start_byte, chunk_size, callback=None, enable_callback=True): ...
    def read(self, amount=None): ...
    def seek(self, where): ...
    def enable_callback(self): ...
    def disable_callback(self): ...

class StreamReaderProgress:
    def __init__(self, stream, callback=None): ...
    def read(self, *args, **kwargs): ...

File Utilities and Progress Tracking

Bandwidth Management

Comprehensive bandwidth limiting using leaky bucket algorithms and consumption scheduling for controlling transfer rates.

class BandwidthLimiter:
    def __init__(self, leaky_bucket, time_utils=None): ...
    def get_bandwith_limited_stream(self, stream, transfer_coordinator): ...

class LeakyBucket:
    def __init__(self, max_rate, time_utils=None): ...
    def consume(self, amount, request_token): ...

Bandwidth Management

Event Subscribers and Callbacks

Extensible subscriber system for handling transfer events including progress updates, completion notifications, and error handling.

class BaseSubscriber:
    def on_queued(self, **kwargs): ...
    def on_progress(self, bytes_transferred, **kwargs): ...
    def on_done(self, **kwargs): ...

Event Subscribers and Callbacks

Exception Handling

Comprehensive exception classes for handling transfer failures, retry exhaustion, and coordination errors.

class RetriesExceededError(Exception):
    def __init__(self, last_exception): ...
    @property
    def last_exception(self): ...

class S3UploadFailedError(Exception): ...
class S3DownloadFailedError(Exception): ...
class TransferNotDoneError(Exception): ...
class FatalError(CancelledError): ...

Exception Handling

Process Pool Downloads

High-performance multiprocessing-based downloader for improved throughput by bypassing Python's GIL limitations.

class ProcessPoolDownloader:
    def __init__(self, client_kwargs=None, config=None): ...
    def download_file(self, bucket, key, filename, extra_args=None, expected_size=None): ...
    def shutdown(self): ...
    def __enter__(self): ...
    def __exit__(self, exc_type, exc_val, exc_tb): ...

class ProcessTransferConfig:
    def __init__(self, multipart_threshold=8388608, multipart_chunksize=8388608, max_request_processes=10): ...

class ProcessPoolTransferFuture:
    def done(self) -> bool: ...
    def result(self): ...
    def cancel(self): ...
    @property
    def meta(self): ...

class ProcessPoolTransferMeta:
    @property
    def call_args(self): ...
    @property
    def transfer_id(self): ...

Process Pool Downloads

AWS Common Runtime (CRT) Support

High-performance transfer manager implementation using AWS Common Runtime for improved throughput and efficiency. Provides drop-in replacement for TransferManager with automatic throughput optimization.

class CRTTransferManager:
    def __init__(self, crt_s3_client, crt_request_serializer, osutil=None): ...
    def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None): ...
    def download(self, bucket, key, fileobj, extra_args=None, subscribers=None): ...
    def delete(self, bucket, key, extra_args=None, subscribers=None): ...
    def shutdown(self, cancel=False): ...
    def __enter__(self): ...
    def __exit__(self, exc_type, exc_val, exc_tb): ...

class CRTTransferFuture:
    def done(self) -> bool: ...
    def result(self, timeout=None): ...
    def cancel(self): ...
    @property
    def meta(self): ...

class BotocoreCRTRequestSerializer:
    def __init__(self, session, region_name, signature_version='s3v4'): ...
    def serialize_http_request(self, request_dict): ...

def create_s3_crt_client(region_name, num_threads=None, target_throughput=None, part_size=8388608, use_ssl=True, verify=None): ...
def acquire_crt_s3_process_lock(): ...

AWS Common Runtime (CRT) Support

Callback Control Utilities

Global utility functions for controlling upload callback behavior in S3 operations.

def disable_upload_callbacks(request, operation_name, **kwargs):
    """
    Disable upload progress callbacks for S3 operations.
    
    Args:
        request: Boto3 request object
        operation_name (str): Name of the S3 operation
        **kwargs: Additional arguments
    """

def enable_upload_callbacks(request, operation_name, **kwargs):
    """
    Enable upload progress callbacks for S3 operations.
    
    Args:
        request: Boto3 request object  
        operation_name (str): Name of the S3 operation
        **kwargs: Additional arguments
    """

Types

Core Types

# Callback function type for progress tracking
CallbackType = Callable[[int], None]

# Extra arguments dictionary for S3 operations
ExtraArgsType = Dict[str, Any]

# Subscriber list for event handling
SubscribersType = List[BaseSubscriber]

# Transfer source for copy operations
CopySourceType = Dict[str, str]  # {'Bucket': str, 'Key': str, 'VersionId': str}

Constants

# Size constants
KB = 1024
MB = KB * KB
GB = MB * KB

# S3 limits
MAX_PARTS = 10000
MAX_SINGLE_UPLOAD_SIZE = 5 * GB
MIN_UPLOAD_CHUNKSIZE = 5 * MB

# Default configuration values
DEFAULT_MULTIPART_THRESHOLD = 8 * MB
DEFAULT_MULTIPART_CHUNKSIZE = 8 * MB
DEFAULT_MAX_CONCURRENCY = 10

# Allowed S3 operation arguments
ALLOWED_DOWNLOAD_ARGS: List[str]
ALLOWED_UPLOAD_ARGS: List[str]
ALLOWED_COPY_ARGS: List[str]
ALLOWED_DELETE_ARGS: List[str]