CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-s3transfer

An Amazon S3 Transfer Manager that provides high-level abstractions for efficient uploads/downloads with multipart transfers, progress callbacks, and retry logic.

Pending
Overview
Eval results
Files

futures-coordination.mddocs/

Future-based Coordination

Asynchronous transfer execution using futures, coordinators, and metadata tracking for monitoring transfer progress, handling completion, and coordinating complex multi-part operations.

Capabilities

TransferFuture

Future object representing a transfer request with methods for monitoring progress and retrieving results.

class TransferFuture:
    """
    Future representing a transfer request.
    
    Provides methods to check completion status, retrieve results, and cancel operations.
    """
    def done(self) -> bool:
        """
        Check if the transfer is complete.
        
        Returns:
            bool: True if transfer is complete (success or failure), False otherwise
        """
    
    def result(self):
        """
        Get the transfer result, blocking until complete.
        
        Returns:
            Any: Transfer result (usually None for successful transfers)
        
        Raises:
            Exception: Any exception that occurred during transfer
            TransferNotDoneError: If called before transfer completion
        """
    
    def cancel(self):
        """
        Cancel the transfer if possible.
        
        Returns:
            bool: True if cancellation was successful, False otherwise
        """
    
    def set_exception(self, exception):
        """
        Set an exception on the future.
        
        Args:
            exception: Exception to set on the future
        """
    
    @property
    def meta(self) -> 'TransferMeta':
        """
        Transfer metadata object containing call arguments and status information.
        
        Returns:
            TransferMeta: Metadata object for this transfer
        """

TransferMeta

Metadata container providing information about a transfer including call arguments, transfer ID, size, and custom context.

class TransferMeta:
    """
    Metadata about a TransferFuture containing call arguments and transfer information.
    """
    @property
    def call_args(self):
        """
        The original call arguments used for the transfer.
        
        Returns:
            CallArgs: Object containing method arguments (bucket, key, etc.)
        """
    
    @property
    def transfer_id(self) -> str:
        """
        Unique identifier for this transfer.
        
        Returns:
            str: Unique transfer ID
        """
    
    @property
    def size(self) -> Optional[int]:
        """
        Total size of the transfer in bytes (if known).
        
        Returns:
            int or None: Transfer size in bytes, None if unknown
        """
    
    @property
    def user_context(self) -> Dict[str, Any]:
        """
        User-defined context dictionary for storing custom data.
        
        Returns:
            dict: User context dictionary
        """
    
    @property
    def etag(self) -> Optional[str]:
        """
        ETag of the S3 object (if available).
        
        Returns:
            str or None: Object ETag, None if not available
        """
    
    def provide_transfer_size(self, size: int):
        """
        Provide the total transfer size for progress tracking.
        
        Args:
            size (int): Total size in bytes
        """
    
    def provide_object_etag(self, etag: str):
        """
        Provide the object ETag.
        
        Args:
            etag (str): Object ETag value
        """

TransferCoordinator

Central coordinator managing transfer execution, associated futures, and cleanup operations.

class TransferCoordinator:
    """
    Coordinates transfer execution and manages associated futures.
    
    Handles task submission, result/exception setting, cancellation, and cleanup operations.
    
    Args:
        transfer_id: Optional transfer identifier (default: None)
    """
    def __init__(self, transfer_id=None): ...
    def set_result(self, result):
        """
        Set the transfer result.
        
        Args:
            result: Result value for the transfer
        """
    
    def set_exception(self, exception, override=False):
        """
        Set an exception for the transfer.
        
        Args:
            exception: Exception that occurred during transfer
            override (bool): If True, override any existing state (default: False)
        """
    
    def result(self):
        """
        Get the transfer result, blocking until complete.
        
        Returns:
            Any: Transfer result
        
        Raises:
            Exception: Any exception that occurred during transfer
        """
    
    def cancel(self, msg: str = '', exc_type=CancelledError):
        """
        Cancel the transfer.
        
        Args:
            msg (str): Cancellation message (default: '')
            exc_type: Type of exception to set for cancellation (default: CancelledError)
        """
    
    def submit(self, executor, task, tag=None):
        """
        Submit a task for execution.
        
        Args:
            executor: Executor to submit task to
            task: Callable task to execute
            tag: TaskTag to associate with the submitted task (optional)
        
        Returns:
            concurrent.futures.Future: Future object for the submitted task
        """
    
    def done(self) -> bool:
        """
        Check if the transfer is complete.
        
        Returns:
            bool: True if complete, False otherwise
        """
    
    def add_done_callback(self, function, *args, **kwargs):
        """
        Add a callback to be called when transfer completes.
        
        Args:
            function: Callback function to call on completion
            *args: Additional positional arguments to pass to callback
            **kwargs: Additional keyword arguments to pass to callback
        """
    
    def add_failure_cleanup(self, function, *args, **kwargs):
        """
        Add a cleanup function to be called if transfer fails.
        
        Args:
            function: Function to call for cleanup on failure
            *args: Additional positional arguments to pass to cleanup function
            **kwargs: Additional keyword arguments to pass to cleanup function
        """
    
    def announce_done(self):
        """
        Announce that the transfer is complete and trigger callbacks.
        """
    
    def set_status_to_queued(self):
        """
        Set the TransferFuture's status to queued.
        """
    
    def set_status_to_running(self):
        """
        Set the TransferFuture's status to running.
        """
    
    def add_associated_future(self, future):
        """
        Add a future to be associated with the TransferFuture.
        
        Args:
            future: Future object to associate with this coordinator
        """
    
    def remove_associated_future(self, future):
        """
        Remove a future's association to the TransferFuture.
        
        Args:
            future: Future object to disassociate from this coordinator
        """
    
    @property
    def exception(self):
        """
        Exception that occurred during transfer (if any).
        
        Returns:
            Exception or None: Transfer exception, None if no exception
        """
    
    @property
    def associated_futures(self) -> Set:
        """
        Set of futures associated with this coordinator.
        
        Returns:
            set: Set of associated Future objects
        """
    
    @property
    def failure_cleanups(self) -> List:
        """
        List of cleanup functions to call on failure.
        
        Returns:
            list: List of cleanup functions
        """
    
    @property
    def status(self) -> str:
        """
        Current status of the transfer with detailed state information.
        
        Returns:
            str: Status string with specific states:
                - 'not-started': Has yet to start, can be cancelled immediately
                - 'queued': SubmissionTask is about to submit tasks
                - 'running': Is in progress (SubmissionTask executing)
                - 'cancelled': Was cancelled
                - 'failed': An exception other than CancelledError was thrown
                - 'success': No exceptions were thrown and is done
        """

BoundedExecutor

Executor with bounded task submission queue to prevent unlimited memory growth during high-volume operations.

class BoundedExecutor:
    """
    Executor with bounded task submission queue.
    
    Prevents unlimited memory growth by blocking task submission when queue is full.
    """
    def __init__(self, executor, max_size: int, tag_semaphores=None): ...
    
    def submit(self, fn, *args, **kwargs):
        """
        Submit a task for execution, blocking if queue is full.
        
        Args:
            fn: Function to execute
            *args: Positional arguments for function
            **kwargs: Keyword arguments for function
        
        Returns:
            Future: Future object for the submitted task
        """
    
    def shutdown(self, wait: bool = True):
        """
        Shutdown the executor.
        
        Args:
            wait (bool): Whether to wait for completion
        """

ExecutorFuture

Wrapper around concurrent.futures.Future providing consistent interface for transfer operations.

class ExecutorFuture:
    """
    Wrapper around concurrent.futures.Future with additional functionality.
    """
    def __init__(self, future): ...
    
    def result(self):
        """
        Get result from the wrapped future.
        
        Returns:
            Any: Future result
        """
    
    def add_done_callback(self, fn):
        """
        Add callback to be called when future completes.
        
        Args:
            fn: Callback function
        """
    
    def done(self) -> bool:
        """
        Check if future is complete.
        
        Returns:
            bool: True if complete, False otherwise
        """

Usage Examples

Basic Future Handling

from s3transfer.manager import TransferManager
import boto3

client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    # Start transfer and get future
    with open('/tmp/file.txt', 'rb') as f:
        future = transfer_manager.upload(f, 'my-bucket', 'file.txt')
    
    # Check if complete (non-blocking)
    if future.done():
        print("Transfer already complete!")
    else:
        print("Transfer in progress...")
    
    # Wait for completion and get result
    result = future.result()  # Blocks until complete
    print("Transfer completed successfully!")
    
    # Access metadata
    print(f"Transfer ID: {future.meta.transfer_id}")
    print(f"Bucket: {future.meta.call_args.bucket}")
    print(f"Key: {future.meta.call_args.key}")

finally:
    transfer_manager.shutdown()

Progress Tracking with Size Information

import time
from s3transfer.manager import TransferManager

client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    filename = '/tmp/large_file.dat'
    file_size = os.path.getsize(filename)
    
    with open(filename, 'rb') as f:
        future = transfer_manager.upload(f, 'my-bucket', 'large_file.dat')
    
    # Provide size information for progress tracking
    future.meta.provide_transfer_size(file_size)
    
    # Monitor progress
    while not future.done():
        print(f"Transfer ID: {future.meta.transfer_id}")
        print(f"Size: {future.meta.size} bytes")
        print(f"Status: In progress...")
        time.sleep(1)
    
    # Get final result
    result = future.result()
    print("Upload completed!")
    
    # Check if ETag is available
    if future.meta.etag:
        print(f"Object ETag: {future.meta.etag}")

finally:
    transfer_manager.shutdown()

Multiple Concurrent Operations

from s3transfer.manager import TransferManager
import concurrent.futures

client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    # Start multiple transfers
    upload_futures = []
    files = ['/tmp/file1.txt', '/tmp/file2.txt', '/tmp/file3.txt']
    
    for filename in files:
        with open(filename, 'rb') as f:
            future = transfer_manager.upload(f, 'my-bucket', os.path.basename(filename))
            upload_futures.append(future)
    
    # Wait for all to complete
    print(f"Started {len(upload_futures)} uploads...")
    
    completed = 0
    while completed < len(upload_futures):
        for i, future in enumerate(upload_futures):
            if future.done() and i not in processed:
                try:
                    result = future.result()
                    print(f"Completed: {future.meta.call_args.key}")
                    completed += 1
                except Exception as e:
                    print(f"Failed: {future.meta.call_args.key} - {e}")
                    completed += 1
        time.sleep(0.1)
    
    print("All transfers completed!")

finally:
    transfer_manager.shutdown()

Cancellation Handling

import time
import threading
from s3transfer.manager import TransferManager

client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    # Start a large transfer
    with open('/tmp/very_large_file.dat', 'rb') as f:
        future = transfer_manager.upload(f, 'my-bucket', 'very_large_file.dat')
    
    # Cancel after 5 seconds (example)
    def cancel_transfer():
        time.sleep(5)
        print("Cancelling transfer...")
        success = future.cancel()
        print(f"Cancellation {'successful' if success else 'failed'}")
    
    cancel_thread = threading.Thread(target=cancel_transfer)
    cancel_thread.start()
    
    try:
        # This will raise an exception if cancelled
        result = future.result()
        print("Transfer completed successfully!")
    except Exception as e:
        print(f"Transfer failed or was cancelled: {e}")
    
    cancel_thread.join()

finally:
    transfer_manager.shutdown()

Custom Context and Metadata

from s3transfer.manager import TransferManager

client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    with open('/tmp/document.pdf', 'rb') as f:
        future = transfer_manager.upload(f, 'my-bucket', 'documents/document.pdf')
    
    # Add custom context information
    future.meta.user_context['upload_time'] = time.time()
    future.meta.user_context['user_id'] = 'user123'
    future.meta.user_context['department'] = 'engineering'
    
    # Provide additional metadata
    file_size = os.path.getsize('/tmp/document.pdf')
    future.meta.provide_transfer_size(file_size)
    
    # Wait for completion
    result = future.result()
    
    # Access custom context
    upload_time = future.meta.user_context['upload_time']
    user_id = future.meta.user_context['user_id']
    
    print(f"Upload completed for user {user_id} at {upload_time}")
    print(f"File size: {future.meta.size} bytes")

finally:
    transfer_manager.shutdown()

Error Handling with Futures

from s3transfer.manager import TransferManager
from s3transfer.exceptions import S3UploadFailedError, TransferNotDoneError

client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    # Start multiple transfers with error handling
    futures = []
    files = ['/tmp/file1.txt', '/tmp/nonexistent.txt', '/tmp/file3.txt']
    
    for filename in files:
        try:
            with open(filename, 'rb') as f:
                future = transfer_manager.upload(f, 'my-bucket', os.path.basename(filename))
                futures.append((future, filename))
        except FileNotFoundError:
            print(f"File not found: {filename}")
            continue
    
    # Process results
    for future, filename in futures:
        try:
            result = future.result()
            print(f"✓ Successfully uploaded: {filename}")
            
        except S3UploadFailedError as e:
            print(f"✗ Upload failed for {filename}: {e}")
            
        except TransferNotDoneError as e:
            print(f"✗ Transfer not complete for {filename}: {e}")
            
        except Exception as e:
            print(f"✗ Unexpected error for {filename}: {e}")

finally:
    transfer_manager.shutdown()

Working with TransferCoordinator

from s3transfer.manager import TransferManager
from s3transfer.futures import TransferCoordinator

# Example of accessing the underlying coordinator (advanced usage)
client = boto3.client('s3')
transfer_manager = TransferManager(client)

try:
    with open('/tmp/file.txt', 'rb') as f:
        future = transfer_manager.upload(f, 'my-bucket', 'file.txt')
    
    # Access the underlying coordinator (advanced usage)
    coordinator = future._coordinator  # Note: private attribute
    
    # Add custom done callback
    def on_transfer_complete():
        print(f"Transfer {future.meta.transfer_id} completed!")
        print(f"Final status: {coordinator.status}")
    
    coordinator.add_done_callback(on_transfer_complete)
    
    # Add failure cleanup
    def cleanup_on_failure():
        print("Cleaning up after transfer failure...")
    
    coordinator.add_failure_cleanup(cleanup_on_failure)
    
    # Wait for completion
    result = future.result()

finally:
    transfer_manager.shutdown()

Future States and Lifecycle

Future States

  1. Created: Future object created, transfer queued
  2. Running: Transfer is actively executing
  3. Completed: Transfer finished successfully
  4. Failed: Transfer failed with an exception
  5. Cancelled: Transfer was cancelled before completion

State Transitions

# Check current state
if not future.done():
    print("Transfer is running or queued")
else:
    try:
        result = future.result()
        print("Transfer completed successfully")
    except Exception as e:
        print(f"Transfer failed: {e}")

Best Practices

Future Management

  1. Always call result(): Even if you don't need the return value, call future.result() to ensure exceptions are raised
  2. Handle exceptions: Wrap future.result() in try/catch blocks
  3. Don't ignore futures: Keep references to futures until completion
  4. Check done() for polling: Use future.done() for non-blocking status checks

Resource Management

  1. Limit concurrent futures: Don't create unlimited futures without waiting for completion
  2. Clean up on failure: Use failure cleanup functions for resource cleanup
  3. Cancel when appropriate: Cancel futures during shutdown or error conditions
  4. Monitor memory usage: Large numbers of futures can consume significant memory

Error Handling

  1. Catch specific exceptions: Handle S3UploadFailedError, TransferNotDoneError, etc. specifically
  2. Use coordinator callbacks: Add failure cleanup functions for automatic resource management
  3. Log transfer IDs: Include transfer IDs in error messages for debugging
  4. Implement retry logic: Use futures with retry logic for resilient applications

Install with Tessl CLI

npx tessl i tessl/pypi-s3transfer

docs

bandwidth-management.md

configuration.md

crt-support.md

exception-handling.md

file-utilities.md

futures-coordination.md

index.md

legacy-transfer.md

process-pool-downloads.md

subscribers-callbacks.md

transfer-manager.md

tile.json