An Amazon S3 Transfer Manager that provides high-level abstractions for efficient uploads/downloads with multipart transfers, progress callbacks, and retry logic.
—
High-performance multiprocessing-based download functionality that bypasses Python's Global Interpreter Lock (GIL) limitations for improved throughput on multi-core systems. This module provides an alternative to the thread-based TransferManager for download-only scenarios requiring maximum performance.
The main downloader class that uses multiple processes for concurrent S3 downloads, providing true parallelism and better CPU utilization compared to thread-based approaches.
class ProcessPoolDownloader:
"""
Multiprocessing-based S3 downloader for high-performance downloads.
Args:
client_kwargs (dict, optional): Arguments for creating S3 clients in each process
config (ProcessTransferConfig, optional): Configuration for download behavior
"""
def __init__(self, client_kwargs=None, config=None): ...
def download_file(self, bucket, key, filename, extra_args=None, expected_size=None):
"""
Download an S3 object to a local file using multiple processes.
Args:
bucket (str): S3 bucket name
key (str): S3 object key/name
filename (str): Local file path to download to
extra_args (dict, optional): Additional S3 operation arguments
expected_size (int, optional): Expected size of the object (avoids HEAD request)
Returns:
ProcessPoolTransferFuture: Future object for tracking download progress
"""
def shutdown(self):
"""
Shutdown the downloader and wait for all downloads to complete.
"""
def __enter__(self):
"""Context manager entry."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit with automatic shutdown."""Configuration class for controlling ProcessPool downloader behavior including multipart thresholds and process concurrency.
class ProcessTransferConfig:
"""
Configuration for ProcessPoolDownloader with multiprocessing-specific options.
Args:
multipart_threshold (int): Size threshold for ranged downloads (default: 8MB)
multipart_chunksize (int): Size of each download chunk (default: 8MB)
max_request_processes (int): Maximum number of download processes (default: 10)
"""
def __init__(
self,
multipart_threshold=8 * 1024 * 1024,
multipart_chunksize=8 * 1024 * 1024,
max_request_processes=10
): ...
multipart_threshold: int
multipart_chunksize: int
max_request_processes: intFuture object representing a ProcessPool download operation with methods for monitoring progress and retrieving results.
class ProcessPoolTransferFuture:
"""
Future representing a ProcessPool download request.
"""
def done(self) -> bool:
"""
Check if the download is complete.
Returns:
bool: True if download is complete (success or failure), False otherwise
"""
def result(self):
"""
Get the download result, blocking until complete.
Returns:
None: Returns None on successful completion
Raises:
Exception: Any exception that occurred during download
"""
def cancel(self):
"""
Cancel the download if possible.
Returns:
bool: True if cancellation was successful, False otherwise
"""
@property
def meta(self) -> 'ProcessPoolTransferMeta':
"""
Transfer metadata object containing call arguments and status information.
Returns:
ProcessPoolTransferMeta: Metadata object for this download
"""Metadata container providing information about a ProcessPool download including call arguments and transfer ID.
class ProcessPoolTransferMeta:
"""
Metadata about a ProcessPoolTransferFuture containing call arguments and transfer information.
"""
@property
def call_args(self):
"""
The original call arguments used for the download.
Returns:
CallArgs: Object containing method arguments (bucket, key, filename, etc.)
"""
@property
def transfer_id(self):
"""
Unique identifier for this transfer.
Returns:
str: Transfer ID string
"""import boto3
from s3transfer.processpool import ProcessPoolDownloader, ProcessTransferConfig
# Create downloader with custom configuration
config = ProcessTransferConfig(
multipart_threshold=16 * 1024 * 1024, # 16MB
multipart_chunksize=8 * 1024 * 1024, # 8MB chunks
max_request_processes=15 # 15 concurrent processes
)
downloader = ProcessPoolDownloader(
client_kwargs={'region_name': 'us-west-2'},
config=config
)
try:
# Download a file
future = downloader.download_file('my-bucket', 'large-file.zip', '/tmp/downloaded-file.zip')
# Wait for completion
future.result() # Blocks until complete
print("Download completed successfully")
finally:
downloader.shutdown()from s3transfer.processpool import ProcessPoolDownloader
# Using context manager for automatic cleanup
with ProcessPoolDownloader() as downloader:
future = downloader.download_file('my-bucket', 'data.csv', '/tmp/data.csv')
future.result()
print("Download completed")
# Downloader automatically shut downfrom s3transfer.processpool import ProcessPoolDownloader
files_to_download = [
('my-bucket', 'file1.txt', '/tmp/file1.txt'),
('my-bucket', 'file2.txt', '/tmp/file2.txt'),
('my-bucket', 'file3.txt', '/tmp/file3.txt'),
]
with ProcessPoolDownloader() as downloader:
futures = []
# Start all downloads
for bucket, key, filename in files_to_download:
future = downloader.download_file(bucket, key, filename)
futures.append(future)
# Wait for all to complete
for future in futures:
future.result()
print("All downloads completed")Use ProcessPoolDownloader when:
Use TransferManager when:
Install with Tessl CLI
npx tessl i tessl/pypi-s3transfer