An Amazon S3 Transfer Manager that provides high-level abstractions for efficient uploads/downloads with multipart transfers, progress callbacks, and retry logic.
npx @tessl/cli install tessl/pypi-s3transfer@0.13.0A Python library that provides high-level abstractions for efficient Amazon S3 uploads and downloads. S3Transfer handles multipart operations, parallel processing, bandwidth throttling, progress callbacks, and retry logic, making it the foundational transfer layer for boto3's S3 operations.
pip install s3transferimport s3transfer
from s3transfer import S3Transfer, TransferConfigModern API (recommended):
from s3transfer.manager import TransferManager, TransferConfigimport boto3
from s3transfer import S3Transfer, TransferConfig
# Create S3 client and transfer manager
client = boto3.client('s3', region_name='us-west-2')
transfer = S3Transfer(client)
# Upload a file
transfer.upload_file('/tmp/myfile.txt', 'my-bucket', 'myfile.txt')
# Download a file
transfer.download_file('my-bucket', 'myfile.txt', '/tmp/downloaded.txt')
# With configuration
config = TransferConfig(
multipart_threshold=8 * 1024 * 1024, # 8MB
max_concurrency=10,
num_download_attempts=5
)
transfer = S3Transfer(client, config)import boto3
from s3transfer.manager import TransferManager, TransferConfig
# Create transfer manager
client = boto3.client('s3', region_name='us-west-2')
config = TransferConfig(
multipart_threshold=8 * 1024 * 1024,
max_request_concurrency=10,
max_bandwidth=100 * 1024 * 1024 # 100MB/s
)
transfer_manager = TransferManager(client, config)
# Upload with progress tracking
with open('/tmp/myfile.txt', 'rb') as f:
future = transfer_manager.upload(f, 'my-bucket', 'myfile.txt')
future.result() # Wait for completion
# Download
with open('/tmp/downloaded.txt', 'wb') as f:
future = transfer_manager.download('my-bucket', 'myfile.txt', f)
future.result()
# Always shutdown when done
transfer_manager.shutdown()S3Transfer is built around a two-tier API design:
The modern TransferManager provides enhanced capabilities including better resource management, more flexible configuration, and improved progress tracking compared to the legacy S3Transfer class.
The original S3Transfer class providing simple upload and download operations with basic configuration and progress callbacks.
class S3Transfer:
def __init__(self, client, config=None, osutil=None): ...
def upload_file(self, filename, bucket, key, callback=None, extra_args=None): ...
def download_file(self, bucket, key, filename, extra_args=None, callback=None): ...The recommended TransferManager class offering enhanced capabilities including upload/download/copy/delete operations, better resource management, and comprehensive configuration options.
class TransferManager:
def __init__(self, client, config=None, osutil=None, executor_cls=None): ...
def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None): ...
def download(self, bucket, key, fileobj, extra_args=None, subscribers=None): ...
def copy(self, copy_source, bucket, key, extra_args=None, subscribers=None, source_client=None): ...
def delete(self, bucket, key, extra_args=None, subscribers=None): ...
def shutdown(self, cancel=False, cancel_msg=''): ...Comprehensive configuration classes for controlling transfer behavior including thresholds, concurrency, retry settings, and bandwidth limits.
class TransferConfig:
def __init__(self, multipart_threshold=8388608, max_concurrency=10, multipart_chunksize=8388608, num_download_attempts=5, max_io_queue=100): ...
class TransferConfig: # Modern version
def __init__(self, multipart_threshold=8388608, multipart_chunksize=8388608, max_request_concurrency=10, max_submission_concurrency=5, max_request_queue_size=1024, max_submission_queue_size=1024, max_io_queue_size=1024, io_chunksize=262144, num_download_attempts=5, max_in_memory_upload_chunks=10, max_in_memory_download_chunks=10, max_bandwidth=None): ...Asynchronous transfer execution using futures, coordinators, and metadata tracking for monitoring transfer progress and handling completion.
class TransferFuture:
def done(self) -> bool: ...
def result(self): ...
def cancel(self): ...
@property
def meta(self) -> TransferMeta: ...
class TransferMeta:
@property
def call_args(self): ...
@property
def transfer_id(self): ...
@property
def size(self): ...File handling utilities including chunk readers, progress streams, and OS operations with callback support for monitoring transfer progress.
class ReadFileChunk:
def __init__(self, fileobj, start_byte, chunk_size, full_file_size, callback=None, enable_callback=True): ...
@classmethod
def from_filename(cls, filename, start_byte, chunk_size, callback=None, enable_callback=True): ...
def read(self, amount=None): ...
def seek(self, where): ...
def enable_callback(self): ...
def disable_callback(self): ...
class StreamReaderProgress:
def __init__(self, stream, callback=None): ...
def read(self, *args, **kwargs): ...File Utilities and Progress Tracking
Comprehensive bandwidth limiting using leaky bucket algorithms and consumption scheduling for controlling transfer rates.
class BandwidthLimiter:
def __init__(self, leaky_bucket, time_utils=None): ...
def get_bandwith_limited_stream(self, stream, transfer_coordinator): ...
class LeakyBucket:
def __init__(self, max_rate, time_utils=None): ...
def consume(self, amount, request_token): ...Extensible subscriber system for handling transfer events including progress updates, completion notifications, and error handling.
class BaseSubscriber:
def on_queued(self, **kwargs): ...
def on_progress(self, bytes_transferred, **kwargs): ...
def on_done(self, **kwargs): ...Event Subscribers and Callbacks
Comprehensive exception classes for handling transfer failures, retry exhaustion, and coordination errors.
class RetriesExceededError(Exception):
def __init__(self, last_exception): ...
@property
def last_exception(self): ...
class S3UploadFailedError(Exception): ...
class S3DownloadFailedError(Exception): ...
class TransferNotDoneError(Exception): ...
class FatalError(CancelledError): ...High-performance multiprocessing-based downloader for improved throughput by bypassing Python's GIL limitations.
class ProcessPoolDownloader:
def __init__(self, client_kwargs=None, config=None): ...
def download_file(self, bucket, key, filename, extra_args=None, expected_size=None): ...
def shutdown(self): ...
def __enter__(self): ...
def __exit__(self, exc_type, exc_val, exc_tb): ...
class ProcessTransferConfig:
def __init__(self, multipart_threshold=8388608, multipart_chunksize=8388608, max_request_processes=10): ...
class ProcessPoolTransferFuture:
def done(self) -> bool: ...
def result(self): ...
def cancel(self): ...
@property
def meta(self): ...
class ProcessPoolTransferMeta:
@property
def call_args(self): ...
@property
def transfer_id(self): ...High-performance transfer manager implementation using AWS Common Runtime for improved throughput and efficiency. Provides drop-in replacement for TransferManager with automatic throughput optimization.
class CRTTransferManager:
def __init__(self, crt_s3_client, crt_request_serializer, osutil=None): ...
def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None): ...
def download(self, bucket, key, fileobj, extra_args=None, subscribers=None): ...
def delete(self, bucket, key, extra_args=None, subscribers=None): ...
def shutdown(self, cancel=False): ...
def __enter__(self): ...
def __exit__(self, exc_type, exc_val, exc_tb): ...
class CRTTransferFuture:
def done(self) -> bool: ...
def result(self, timeout=None): ...
def cancel(self): ...
@property
def meta(self): ...
class BotocoreCRTRequestSerializer:
def __init__(self, session, region_name, signature_version='s3v4'): ...
def serialize_http_request(self, request_dict): ...
def create_s3_crt_client(region_name, num_threads=None, target_throughput=None, part_size=8388608, use_ssl=True, verify=None): ...
def acquire_crt_s3_process_lock(): ...AWS Common Runtime (CRT) Support
Global utility functions for controlling upload callback behavior in S3 operations.
def disable_upload_callbacks(request, operation_name, **kwargs):
"""
Disable upload progress callbacks for S3 operations.
Args:
request: Boto3 request object
operation_name (str): Name of the S3 operation
**kwargs: Additional arguments
"""
def enable_upload_callbacks(request, operation_name, **kwargs):
"""
Enable upload progress callbacks for S3 operations.
Args:
request: Boto3 request object
operation_name (str): Name of the S3 operation
**kwargs: Additional arguments
"""# Callback function type for progress tracking
CallbackType = Callable[[int], None]
# Extra arguments dictionary for S3 operations
ExtraArgsType = Dict[str, Any]
# Subscriber list for event handling
SubscribersType = List[BaseSubscriber]
# Transfer source for copy operations
CopySourceType = Dict[str, str] # {'Bucket': str, 'Key': str, 'VersionId': str}# Size constants
KB = 1024
MB = KB * KB
GB = MB * KB
# S3 limits
MAX_PARTS = 10000
MAX_SINGLE_UPLOAD_SIZE = 5 * GB
MIN_UPLOAD_CHUNKSIZE = 5 * MB
# Default configuration values
DEFAULT_MULTIPART_THRESHOLD = 8 * MB
DEFAULT_MULTIPART_CHUNKSIZE = 8 * MB
DEFAULT_MAX_CONCURRENCY = 10
# Allowed S3 operation arguments
ALLOWED_DOWNLOAD_ARGS: List[str]
ALLOWED_UPLOAD_ARGS: List[str]
ALLOWED_COPY_ARGS: List[str]
ALLOWED_DELETE_ARGS: List[str]