An Amazon S3 Transfer Manager that provides high-level abstractions for efficient uploads/downloads with multipart transfers, progress callbacks, and retry logic.
—
High-performance transfer manager implementation using the AWS Common Runtime (CRT) for significantly improved throughput and efficiency. Provides a drop-in replacement for the regular TransferManager with automatic throughput optimization and advanced performance features.
Drop-in replacement for TransferManager using AWS Common Runtime for high-performance S3 operations with automatic throughput optimization.
class CRTTransferManager:
"""
High-performance transfer manager using AWS Common Runtime.
Args:
crt_s3_client: AWS CRT S3 client instance
crt_request_serializer: Request serializer for botocore compatibility
osutil: OSUtils instance for file operations (default: OSUtils())
"""
def __init__(self, crt_s3_client, crt_request_serializer, osutil=None): ...
def upload(self, fileobj, bucket, key, extra_args=None, subscribers=None):
"""
Upload a file-like object to S3 using CRT.
Args:
fileobj: File-like object to upload (must support read())
bucket (str): S3 bucket name
key (str): S3 object key/name
extra_args (dict, optional): Additional S3 operation arguments
subscribers (list, optional): List of subscriber objects for events
Returns:
CRTTransferFuture: Future object for tracking transfer progress
"""
def download(self, bucket, key, fileobj, extra_args=None, subscribers=None):
"""
Download an S3 object to a file-like object using CRT.
Args:
bucket (str): S3 bucket name
key (str): S3 object key/name
fileobj: File-like object to write to (must support write())
extra_args (dict, optional): Additional S3 operation arguments
subscribers (list, optional): List of subscriber objects for events
Returns:
CRTTransferFuture: Future object for tracking transfer progress
"""
def delete(self, bucket, key, extra_args=None, subscribers=None):
"""
Delete an S3 object using CRT.
Args:
bucket (str): S3 bucket name
key (str): S3 object key/name
extra_args (dict, optional): Additional S3 operation arguments
subscribers (list, optional): List of subscriber objects for events
Returns:
CRTTransferFuture: Future object for tracking deletion progress
"""
def shutdown(self, cancel=False):
"""
Shutdown the CRT transfer manager.
Args:
cancel (bool): Whether to cancel ongoing transfers (default: False)
"""
def __enter__(self):
"""Context manager entry."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit with automatic shutdown."""Future object representing a CRT transfer operation with methods for monitoring progress and retrieving results.
class CRTTransferFuture:
"""
Future representing a CRT transfer request.
"""
def done(self) -> bool:
"""
Check if the transfer is complete.
Returns:
bool: True if transfer is complete (success or failure), False otherwise
"""
def result(self, timeout=None):
"""
Get the transfer result, blocking until complete.
Args:
timeout (float, optional): Maximum time to wait for completion
Returns:
None: Returns None on successful completion
Raises:
Exception: Any exception that occurred during transfer
TimeoutError: If timeout is reached before completion
"""
def cancel(self):
"""
Cancel the transfer if possible.
Returns:
bool: True if cancellation was successful, False otherwise
"""
@property
def meta(self):
"""
Transfer metadata object containing call arguments and status information.
Returns:
CRTTransferMeta: Metadata object for this transfer
"""Request serializer that provides compatibility between botocore and AWS CRT S3 client.
class BotocoreCRTRequestSerializer:
"""
Serializes HTTP requests using botocore logic for CRT compatibility.
Args:
session: Botocore session instance
region_name (str): AWS region name
signature_version (str): Signature version (default: 's3v4')
"""
def __init__(self, session, region_name, signature_version='s3v4'): ...
def serialize_http_request(self, request_dict):
"""
Serialize a request dictionary to HTTP request format.
Args:
request_dict (dict): Request parameters dictionary
Returns:
dict: Serialized HTTP request
"""Utility functions for creating and managing CRT S3 clients with optimized performance settings.
def create_s3_crt_client(
region_name,
num_threads=None,
target_throughput=None,
part_size=8388608,
use_ssl=True,
verify=None
):
"""
Create an optimized CRT S3 client.
Args:
region_name (str): AWS region name
num_threads (int, optional): Number of worker threads (defaults to CPU count)
target_throughput (int, optional): Target throughput in bytes/second (auto-detected)
part_size (int): Part size for multipart operations (default: 8MB)
use_ssl (bool): Enable SSL/TLS (default: True)
verify (bool/str, optional): Certificate verification settings
Returns:
S3Client: Configured CRT S3 client
"""
def acquire_crt_s3_process_lock():
"""
Acquire a process-level lock for CRT S3 client usage.
Prevents multiple CRT S3 clients from running simultaneously in the same process,
which can cause resource conflicts.
Returns:
Lock: Process lock object
"""CRT support is an optional feature requiring additional dependencies:
# Install s3transfer with CRT support
pip install s3transfer[crt]
# Or install with specific botocore CRT support
pip install botocore[crt] s3transferawscrt package)import boto3
from s3transfer.crt import CRTTransferManager, BotocoreCRTRequestSerializer, create_s3_crt_client
# Create CRT S3 client with performance optimizations
crt_client = create_s3_crt_client(
region_name='us-west-2',
target_throughput=10 * 1024 * 1024 * 1024, # 10 Gbps
num_threads=16
)
# Create request serializer
session = boto3.Session()
serializer = BotocoreCRTRequestSerializer(session, 'us-west-2')
# Create CRT transfer manager
transfer_manager = CRTTransferManager(crt_client, serializer)
try:
# Upload a file with high performance
with open('/path/to/large-file.zip', 'rb') as f:
future = transfer_manager.upload(f, 'my-bucket', 'large-file.zip')
future.result() # Wait for completion
print("Upload completed with CRT acceleration")
finally:
transfer_manager.shutdown()from s3transfer.crt import CRTTransferManager, BotocoreCRTRequestSerializer, create_s3_crt_client
# Setup CRT client and serializer
crt_client = create_s3_crt_client('us-west-2')
serializer = BotocoreCRTRequestSerializer(boto3.Session(), 'us-west-2')
# Use context manager for automatic cleanup
with CRTTransferManager(crt_client, serializer) as transfer_manager:
# Download with CRT acceleration
with open('/tmp/downloaded-file.zip', 'wb') as f:
future = transfer_manager.download('my-bucket', 'large-file.zip', f)
future.result()
print("Download completed")
# Transfer manager automatically shut downfrom s3transfer.crt import acquire_crt_s3_process_lock, create_s3_crt_client
# Acquire process lock to prevent conflicts
with acquire_crt_s3_process_lock():
crt_client = create_s3_crt_client('us-west-2')
# Use CRT client safelyCRT vs Regular TransferManager:
| Feature | Regular TransferManager | CRTTransferManager |
|---|---|---|
| Throughput | Good | Excellent (10+ Gbps) |
| CPU Usage | Higher | Lower |
| Memory Usage | Moderate | Optimized |
| Setup Complexity | Simple | Moderate |
| Dependencies | Minimal | CRT libraries required |
Install with Tessl CLI
npx tessl i tessl/pypi-s3transfer