An Amazon S3 Transfer Manager that provides high-level abstractions for efficient uploads/downloads with multipart transfers, progress callbacks, and retry logic.
—
The TransferManager class is the recommended interface for S3 operations, providing enhanced capabilities including upload/download/copy/delete operations, better resource management, future-based asynchronous execution, and comprehensive configuration options.
The main transfer management class supporting all S3 operations with asynchronous execution and comprehensive resource management.
class TransferManager:
"""
Modern S3 transfer manager with enhanced capabilities.
Args:
client: boto3 S3 client instance
config: TransferConfig for controlling transfer behavior
osutil: OSUtils instance for file operations
executor_cls: Executor class for managing threads/processes
"""
def __init__(self, client, config=None, osutil=None, executor_cls=None): ...
def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None) -> TransferFuture:
"""
Upload a file-like object to S3.
Args:
fileobj: File-like object to upload (must support read())
bucket (str): S3 bucket name
key (str): S3 object key/name
extra_args (dict, optional): Additional S3 operation arguments
subscribers (list, optional): List of subscriber objects for events
Returns:
TransferFuture: Future object for tracking transfer progress
Raises:
ValueError: If extra_args contains invalid keys
"""
def download(self, bucket, key, fileobj, extra_args=None, subscribers=None) -> TransferFuture:
"""
Download an S3 object to a file-like object.
Args:
bucket (str): S3 bucket name
key (str): S3 object key/name
fileobj: File-like object to write to (must support write())
extra_args (dict, optional): Additional S3 operation arguments
subscribers (list, optional): List of subscriber objects for events
Returns:
TransferFuture: Future object for tracking transfer progress
Raises:
ValueError: If extra_args contains invalid keys
"""
def copy(self, copy_source, bucket, key, extra_args=None, subscribers=None, source_client=None) -> TransferFuture:
"""
Copy an S3 object to another location.
Args:
copy_source (dict): Source object specification {'Bucket': str, 'Key': str, 'VersionId': str}
bucket (str): Destination bucket name
key (str): Destination object key/name
extra_args (dict, optional): Additional S3 operation arguments
subscribers (list, optional): List of subscriber objects for events
source_client: Separate S3 client for source operations
Returns:
TransferFuture: Future object for tracking transfer progress
"""
def delete(self, bucket, key, extra_args=None, subscribers=None) -> TransferFuture:
"""
Delete an S3 object.
Args:
bucket (str): S3 bucket name
key (str): S3 object key/name
extra_args (dict, optional): Additional S3 operation arguments
subscribers (list, optional): List of subscriber objects for events
Returns:
TransferFuture: Future object for tracking transfer progress
"""
def shutdown(self, cancel=False, cancel_msg=''):
"""
Shutdown the transfer manager.
Args:
cancel (bool): Whether to cancel ongoing transfers
cancel_msg (str): Message to include with cancellation
"""
@property
def client(self):
"""The boto3 S3 client instance."""
@property
def config(self) -> TransferConfig:
"""The transfer configuration."""
# Class constants for allowed operation arguments
ALLOWED_DOWNLOAD_ARGS: List[str]
ALLOWED_UPLOAD_ARGS: List[str]
ALLOWED_COPY_ARGS: List[str]
ALLOWED_DELETE_ARGS: List[str]Controls and manages all transfer coordinators for a transfer manager, providing centralized coordination and lifecycle management.
class TransferCoordinatorController:
"""
Controls all transfer coordinators for a manager.
"""
def add_transfer_coordinator(self, transfer_coordinator):
"""
Add a transfer coordinator to track.
Args:
transfer_coordinator: TransferCoordinator to manage
"""
def remove_transfer_coordinator(self, transfer_coordinator):
"""
Remove a transfer coordinator from tracking.
Args:
transfer_coordinator: TransferCoordinator to remove
"""
def cancel(self, msg='', exc_type=None):
"""
Cancel all tracked transfers.
Args:
msg (str): Cancellation message
exc_type: Exception type for cancellation
"""
def wait(self):
"""Wait for all transfers to complete."""
@property
def tracked_transfer_coordinators(self) -> Set:
"""Set of currently tracked transfer coordinators."""import boto3
from s3transfer.manager import TransferManager
# Create transfer manager
client = boto3.client('s3', region_name='us-west-2')
transfer_manager = TransferManager(client)
try:
# Upload from file
with open('/tmp/data.csv', 'rb') as f:
future = transfer_manager.upload(f, 'my-bucket', 'data.csv')
future.result() # Wait for completion
# Download to file
with open('/tmp/downloaded.csv', 'wb') as f:
future = transfer_manager.download('my-bucket', 'data.csv', f)
future.result() # Wait for completion
finally:
# Always shutdown when done
transfer_manager.shutdown()from s3transfer.manager import TransferManager
from s3transfer.subscribers import BaseSubscriber
class ProgressSubscriber(BaseSubscriber):
def __init__(self, filename):
self._filename = filename
self._size = None
self._seen_so_far = 0
def on_progress(self, bytes_transferred, **kwargs):
self._seen_so_far += bytes_transferred
if self._size:
percentage = (self._seen_so_far / self._size) * 100
print(f"\r{self._filename}: {percentage:.2f}% complete", end='')
def on_done(self, **kwargs):
print(f"\n{self._filename}: Transfer complete!")
# Upload with progress tracking
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
progress_subscriber = ProgressSubscriber('/tmp/large_file.dat')
with open('/tmp/large_file.dat', 'rb') as f:
future = transfer_manager.upload(
f, 'my-bucket', 'large_file.dat',
subscribers=[progress_subscriber]
)
# Set file size for accurate progress
future.meta.provide_transfer_size(os.path.getsize('/tmp/large_file.dat'))
future.result()
finally:
transfer_manager.shutdown()from s3transfer.manager import TransferManager
import concurrent.futures
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
# Start multiple uploads concurrently
upload_futures = []
files_to_upload = ['/tmp/file1.txt', '/tmp/file2.txt', '/tmp/file3.txt']
for filename in files_to_upload:
with open(filename, 'rb') as f:
future = transfer_manager.upload(f, 'my-bucket', os.path.basename(filename))
upload_futures.append(future)
# Wait for all uploads to complete
for future in upload_futures:
try:
future.result()
print(f"Upload completed for {future.meta.call_args.key}")
except Exception as e:
print(f"Upload failed: {e}")
finally:
transfer_manager.shutdown()from s3transfer.manager import TransferManager
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
# Copy within same account
copy_source = {'Bucket': 'source-bucket', 'Key': 'source-file.txt'}
future = transfer_manager.copy(
copy_source, 'destination-bucket', 'destination-file.txt'
)
future.result()
# Copy with version
copy_source = {
'Bucket': 'source-bucket',
'Key': 'source-file.txt',
'VersionId': 'version-id-here'
}
future = transfer_manager.copy(
copy_source, 'destination-bucket', 'versioned-copy.txt'
)
future.result()
finally:
transfer_manager.shutdown()from s3transfer.manager import TransferManager, TransferConfig
# Create advanced configuration
config = TransferConfig(
multipart_threshold=64 * 1024 * 1024, # 64MB threshold
multipart_chunksize=64 * 1024 * 1024, # 64MB chunks
max_request_concurrency=20, # 20 concurrent requests
max_submission_concurrency=10, # 10 concurrent submissions
max_bandwidth=100 * 1024 * 1024, # 100MB/s bandwidth limit
num_download_attempts=10, # 10 retry attempts
max_in_memory_upload_chunks=5, # 5 chunks in memory
max_in_memory_download_chunks=5 # 5 chunks in memory
)
client = boto3.client('s3')
transfer_manager = TransferManager(client, config)
try:
# Large file operations will benefit from this configuration
with open('/tmp/very_large_file.dat', 'rb') as f:
future = transfer_manager.upload(f, 'my-bucket', 'very_large_file.dat')
# Monitor progress
while not future.done():
time.sleep(1)
if hasattr(future.meta, 'size') and future.meta.size:
print(f"Transfer size: {future.meta.size} bytes")
future.result()
finally:
transfer_manager.shutdown()from s3transfer.manager import TransferManager
from s3transfer.exceptions import S3UploadFailedError, TransferNotDoneError
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
with open('/tmp/test_file.txt', 'rb') as f:
future = transfer_manager.upload(f, 'my-bucket', 'test_file.txt')
try:
# Wait for completion with timeout
result = future.result()
print("Upload successful!")
except S3UploadFailedError as e:
print(f"Upload failed: {e}")
except TransferNotDoneError as e:
print(f"Transfer not complete: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
# Shutdown with cancellation if needed
transfer_manager.shutdown(cancel=True, cancel_msg="Application shutting down")from s3transfer.manager import TransferManager
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
# Simple delete
future = transfer_manager.delete('my-bucket', 'file-to-delete.txt')
future.result()
# Delete with version
future = transfer_manager.delete(
'my-bucket', 'versioned-file.txt',
extra_args={'VersionId': 'version-id-here'}
)
future.result()
# Batch delete
files_to_delete = ['file1.txt', 'file2.txt', 'file3.txt']
delete_futures = []
for filename in files_to_delete:
future = transfer_manager.delete('my-bucket', filename)
delete_futures.append(future)
# Wait for all deletions
for future in delete_futures:
try:
future.result()
print(f"Deleted: {future.meta.call_args.key}")
except Exception as e:
print(f"Delete failed: {e}")
finally:
transfer_manager.shutdown()ACL: Access control list permissionsCacheControl: Cache control directivesContentDisposition: Content disposition headerContentEncoding: Content encoding (e.g., 'gzip')ContentLanguage: Content languageContentType: MIME type of the contentExpires: Expiration dateGrantFullControl: Full control permissionsGrantRead: Read permissionsGrantReadACP: Read ACP permissionsGrantWriteACL: Write ACL permissionsMetadata: User-defined metadata dictionaryRequestPayer: Request payer settingServerSideEncryption: Server-side encryption methodStorageClass: Storage class (STANDARD, REDUCED_REDUNDANCY, etc.)SSECustomerAlgorithm: Customer-provided encryption algorithmSSECustomerKey: Customer-provided encryption keySSECustomerKeyMD5: MD5 hash of customer encryption keySSEKMSKeyId: KMS key ID for encryptionSSEKMSEncryptionContext: KMS encryption contextTagging: Object tagsVersionId: Specific version of the object to downloadSSECustomerAlgorithm: Customer-provided encryption algorithmSSECustomerKey: Customer-provided encryption keySSECustomerKeyMD5: MD5 hash of customer encryption keyRequestPayer: Request payer settingMetadataDirective: Whether to copy or replace metadataTaggingDirective: Whether to copy or replace tagsCopySourceIfMatch: Copy only if ETag matchesCopySourceIfModifiedSince: Copy only if modified since dateCopySourceIfNoneMatch: Copy only if ETag doesn't matchCopySourceIfUnmodifiedSince: Copy only if unmodified since dateCopySourceSSECustomerAlgorithm: Source encryption algorithmCopySourceSSECustomerKey: Source encryption keyCopySourceSSECustomerKeyMD5: Source encryption key MD5VersionId: Specific version to deleteMFA: MFA authentication for delete operationsRequestPayer: Request payer settingInstall with Tessl CLI
npx tessl i tessl/pypi-s3transfer