CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-s3transfer

An Amazon S3 Transfer Manager that provides high-level abstractions for efficient uploads/downloads with multipart transfers, progress callbacks, and retry logic.

Pending
Overview
Eval results
Files

process-pool-downloads.mddocs/

Process Pool Downloads

High-performance multiprocessing-based download functionality that bypasses Python's Global Interpreter Lock (GIL) limitations for improved throughput on multi-core systems. This module provides an alternative to the thread-based TransferManager for download-only scenarios requiring maximum performance.

Capabilities

ProcessPoolDownloader

The main downloader class that uses multiple processes for concurrent S3 downloads, providing true parallelism and better CPU utilization compared to thread-based approaches.

class ProcessPoolDownloader:
    """
    Multiprocessing-based S3 downloader for high-performance downloads.
    
    Args:
        client_kwargs (dict, optional): Arguments for creating S3 clients in each process
        config (ProcessTransferConfig, optional): Configuration for download behavior
    """
    def __init__(self, client_kwargs=None, config=None): ...
    
    def download_file(self, bucket, key, filename, extra_args=None, expected_size=None):
        """
        Download an S3 object to a local file using multiple processes.
        
        Args:
            bucket (str): S3 bucket name
            key (str): S3 object key/name
            filename (str): Local file path to download to
            extra_args (dict, optional): Additional S3 operation arguments
            expected_size (int, optional): Expected size of the object (avoids HEAD request)
        
        Returns:
            ProcessPoolTransferFuture: Future object for tracking download progress
        """
    
    def shutdown(self):
        """
        Shutdown the downloader and wait for all downloads to complete.
        """
    
    def __enter__(self):
        """Context manager entry."""
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit with automatic shutdown."""

ProcessTransferConfig

Configuration class for controlling ProcessPool downloader behavior including multipart thresholds and process concurrency.

class ProcessTransferConfig:
    """
    Configuration for ProcessPoolDownloader with multiprocessing-specific options.
    
    Args:
        multipart_threshold (int): Size threshold for ranged downloads (default: 8MB)
        multipart_chunksize (int): Size of each download chunk (default: 8MB)
        max_request_processes (int): Maximum number of download processes (default: 10)
    """
    def __init__(
        self,
        multipart_threshold=8 * 1024 * 1024,
        multipart_chunksize=8 * 1024 * 1024,
        max_request_processes=10
    ): ...
    
    multipart_threshold: int
    multipart_chunksize: int
    max_request_processes: int

ProcessPoolTransferFuture

Future object representing a ProcessPool download operation with methods for monitoring progress and retrieving results.

class ProcessPoolTransferFuture:
    """
    Future representing a ProcessPool download request.
    """
    def done(self) -> bool:
        """
        Check if the download is complete.
        
        Returns:
            bool: True if download is complete (success or failure), False otherwise
        """
    
    def result(self):
        """
        Get the download result, blocking until complete.
        
        Returns:
            None: Returns None on successful completion
        
        Raises:
            Exception: Any exception that occurred during download
        """
    
    def cancel(self):
        """
        Cancel the download if possible.
        
        Returns:
            bool: True if cancellation was successful, False otherwise
        """
    
    @property
    def meta(self) -> 'ProcessPoolTransferMeta':
        """
        Transfer metadata object containing call arguments and status information.
        
        Returns:
            ProcessPoolTransferMeta: Metadata object for this download
        """

ProcessPoolTransferMeta

Metadata container providing information about a ProcessPool download including call arguments and transfer ID.

class ProcessPoolTransferMeta:
    """
    Metadata about a ProcessPoolTransferFuture containing call arguments and transfer information.
    """
    @property
    def call_args(self):
        """
        The original call arguments used for the download.
        
        Returns:
            CallArgs: Object containing method arguments (bucket, key, filename, etc.)
        """
    
    @property
    def transfer_id(self):
        """
        Unique identifier for this transfer.
        
        Returns:
            str: Transfer ID string
        """

Usage Examples

Basic ProcessPool Download

import boto3
from s3transfer.processpool import ProcessPoolDownloader, ProcessTransferConfig

# Create downloader with custom configuration
config = ProcessTransferConfig(
    multipart_threshold=16 * 1024 * 1024,  # 16MB
    multipart_chunksize=8 * 1024 * 1024,   # 8MB chunks
    max_request_processes=15                # 15 concurrent processes
)

downloader = ProcessPoolDownloader(
    client_kwargs={'region_name': 'us-west-2'},
    config=config
)

try:
    # Download a file
    future = downloader.download_file('my-bucket', 'large-file.zip', '/tmp/downloaded-file.zip')
    
    # Wait for completion
    future.result()  # Blocks until complete
    print("Download completed successfully")
    
finally:
    downloader.shutdown()

Context Manager Usage

from s3transfer.processpool import ProcessPoolDownloader

# Using context manager for automatic cleanup
with ProcessPoolDownloader() as downloader:
    future = downloader.download_file('my-bucket', 'data.csv', '/tmp/data.csv')
    future.result()
    print("Download completed")
# Downloader automatically shut down

Multiple Concurrent Downloads

from s3transfer.processpool import ProcessPoolDownloader

files_to_download = [
    ('my-bucket', 'file1.txt', '/tmp/file1.txt'),
    ('my-bucket', 'file2.txt', '/tmp/file2.txt'),
    ('my-bucket', 'file3.txt', '/tmp/file3.txt'),
]

with ProcessPoolDownloader() as downloader:
    futures = []
    
    # Start all downloads
    for bucket, key, filename in files_to_download:
        future = downloader.download_file(bucket, key, filename)
        futures.append(future)
    
    # Wait for all to complete
    for future in futures:
        future.result()
    
    print("All downloads completed")

Performance Considerations

When to Use ProcessPool vs TransferManager

Use ProcessPoolDownloader when:

  • Downloading many files concurrently
  • Downloading very large files requiring maximum throughput
  • CPU resources are available (multi-core systems)
  • Download-only operations (no uploads or copies needed)
  • Python GIL is a bottleneck in your application

Use TransferManager when:

  • Mixed operations (uploads, downloads, copies) are needed
  • Lower memory overhead is important
  • Simpler threading model is preferred
  • Working with smaller files or fewer concurrent operations

Memory and Resource Usage

  • ProcessPool uses more memory due to separate process overhead
  • Each process maintains its own S3 client and connection pool
  • Better CPU utilization on multi-core systems
  • Higher throughput for I/O intensive workloads
  • Process isolation provides better fault tolerance

Configuration Tuning

  • multipart_threshold: Lower values use more processes, higher throughput but more overhead
  • multipart_chunksize: Smaller chunks provide better parallelism, larger chunks reduce overhead
  • max_request_processes: Should typically match or slightly exceed CPU core count

Install with Tessl CLI

npx tessl i tessl/pypi-s3transfer

docs

bandwidth-management.md

configuration.md

crt-support.md

exception-handling.md

file-utilities.md

futures-coordination.md

index.md

legacy-transfer.md

process-pool-downloads.md

subscribers-callbacks.md

transfer-manager.md

tile.json