An Amazon S3 Transfer Manager that provides high-level abstractions for efficient uploads/downloads with multipart transfers, progress callbacks, and retry logic.
—
Asynchronous transfer execution using futures, coordinators, and metadata tracking for monitoring transfer progress, handling completion, and coordinating complex multi-part operations.
Future object representing a transfer request with methods for monitoring progress and retrieving results.
class TransferFuture:
"""
Future representing a transfer request.
Provides methods to check completion status, retrieve results, and cancel operations.
"""
def done(self) -> bool:
"""
Check if the transfer is complete.
Returns:
bool: True if transfer is complete (success or failure), False otherwise
"""
def result(self):
"""
Get the transfer result, blocking until complete.
Returns:
Any: Transfer result (usually None for successful transfers)
Raises:
Exception: Any exception that occurred during transfer
TransferNotDoneError: If called before transfer completion
"""
def cancel(self):
"""
Cancel the transfer if possible.
Returns:
bool: True if cancellation was successful, False otherwise
"""
def set_exception(self, exception):
"""
Set an exception on the future.
Args:
exception: Exception to set on the future
"""
@property
def meta(self) -> 'TransferMeta':
"""
Transfer metadata object containing call arguments and status information.
Returns:
TransferMeta: Metadata object for this transfer
"""Metadata container providing information about a transfer including call arguments, transfer ID, size, and custom context.
class TransferMeta:
"""
Metadata about a TransferFuture containing call arguments and transfer information.
"""
@property
def call_args(self):
"""
The original call arguments used for the transfer.
Returns:
CallArgs: Object containing method arguments (bucket, key, etc.)
"""
@property
def transfer_id(self) -> str:
"""
Unique identifier for this transfer.
Returns:
str: Unique transfer ID
"""
@property
def size(self) -> Optional[int]:
"""
Total size of the transfer in bytes (if known).
Returns:
int or None: Transfer size in bytes, None if unknown
"""
@property
def user_context(self) -> Dict[str, Any]:
"""
User-defined context dictionary for storing custom data.
Returns:
dict: User context dictionary
"""
@property
def etag(self) -> Optional[str]:
"""
ETag of the S3 object (if available).
Returns:
str or None: Object ETag, None if not available
"""
def provide_transfer_size(self, size: int):
"""
Provide the total transfer size for progress tracking.
Args:
size (int): Total size in bytes
"""
def provide_object_etag(self, etag: str):
"""
Provide the object ETag.
Args:
etag (str): Object ETag value
"""Central coordinator managing transfer execution, associated futures, and cleanup operations.
class TransferCoordinator:
"""
Coordinates transfer execution and manages associated futures.
Handles task submission, result/exception setting, cancellation, and cleanup operations.
Args:
transfer_id: Optional transfer identifier (default: None)
"""
def __init__(self, transfer_id=None): ...
def set_result(self, result):
"""
Set the transfer result.
Args:
result: Result value for the transfer
"""
def set_exception(self, exception, override=False):
"""
Set an exception for the transfer.
Args:
exception: Exception that occurred during transfer
override (bool): If True, override any existing state (default: False)
"""
def result(self):
"""
Get the transfer result, blocking until complete.
Returns:
Any: Transfer result
Raises:
Exception: Any exception that occurred during transfer
"""
def cancel(self, msg: str = '', exc_type=CancelledError):
"""
Cancel the transfer.
Args:
msg (str): Cancellation message (default: '')
exc_type: Type of exception to set for cancellation (default: CancelledError)
"""
def submit(self, executor, task, tag=None):
"""
Submit a task for execution.
Args:
executor: Executor to submit task to
task: Callable task to execute
tag: TaskTag to associate with the submitted task (optional)
Returns:
concurrent.futures.Future: Future object for the submitted task
"""
def done(self) -> bool:
"""
Check if the transfer is complete.
Returns:
bool: True if complete, False otherwise
"""
def add_done_callback(self, function, *args, **kwargs):
"""
Add a callback to be called when transfer completes.
Args:
function: Callback function to call on completion
*args: Additional positional arguments to pass to callback
**kwargs: Additional keyword arguments to pass to callback
"""
def add_failure_cleanup(self, function, *args, **kwargs):
"""
Add a cleanup function to be called if transfer fails.
Args:
function: Function to call for cleanup on failure
*args: Additional positional arguments to pass to cleanup function
**kwargs: Additional keyword arguments to pass to cleanup function
"""
def announce_done(self):
"""
Announce that the transfer is complete and trigger callbacks.
"""
def set_status_to_queued(self):
"""
Set the TransferFuture's status to queued.
"""
def set_status_to_running(self):
"""
Set the TransferFuture's status to running.
"""
def add_associated_future(self, future):
"""
Add a future to be associated with the TransferFuture.
Args:
future: Future object to associate with this coordinator
"""
def remove_associated_future(self, future):
"""
Remove a future's association to the TransferFuture.
Args:
future: Future object to disassociate from this coordinator
"""
@property
def exception(self):
"""
Exception that occurred during transfer (if any).
Returns:
Exception or None: Transfer exception, None if no exception
"""
@property
def associated_futures(self) -> Set:
"""
Set of futures associated with this coordinator.
Returns:
set: Set of associated Future objects
"""
@property
def failure_cleanups(self) -> List:
"""
List of cleanup functions to call on failure.
Returns:
list: List of cleanup functions
"""
@property
def status(self) -> str:
"""
Current status of the transfer with detailed state information.
Returns:
str: Status string with specific states:
- 'not-started': Has yet to start, can be cancelled immediately
- 'queued': SubmissionTask is about to submit tasks
- 'running': Is in progress (SubmissionTask executing)
- 'cancelled': Was cancelled
- 'failed': An exception other than CancelledError was thrown
- 'success': No exceptions were thrown and is done
"""Executor with bounded task submission queue to prevent unlimited memory growth during high-volume operations.
class BoundedExecutor:
"""
Executor with bounded task submission queue.
Prevents unlimited memory growth by blocking task submission when queue is full.
"""
def __init__(self, executor, max_size: int, tag_semaphores=None): ...
def submit(self, fn, *args, **kwargs):
"""
Submit a task for execution, blocking if queue is full.
Args:
fn: Function to execute
*args: Positional arguments for function
**kwargs: Keyword arguments for function
Returns:
Future: Future object for the submitted task
"""
def shutdown(self, wait: bool = True):
"""
Shutdown the executor.
Args:
wait (bool): Whether to wait for completion
"""Wrapper around concurrent.futures.Future providing consistent interface for transfer operations.
class ExecutorFuture:
"""
Wrapper around concurrent.futures.Future with additional functionality.
"""
def __init__(self, future): ...
def result(self):
"""
Get result from the wrapped future.
Returns:
Any: Future result
"""
def add_done_callback(self, fn):
"""
Add callback to be called when future completes.
Args:
fn: Callback function
"""
def done(self) -> bool:
"""
Check if future is complete.
Returns:
bool: True if complete, False otherwise
"""from s3transfer.manager import TransferManager
import boto3
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
# Start transfer and get future
with open('/tmp/file.txt', 'rb') as f:
future = transfer_manager.upload(f, 'my-bucket', 'file.txt')
# Check if complete (non-blocking)
if future.done():
print("Transfer already complete!")
else:
print("Transfer in progress...")
# Wait for completion and get result
result = future.result() # Blocks until complete
print("Transfer completed successfully!")
# Access metadata
print(f"Transfer ID: {future.meta.transfer_id}")
print(f"Bucket: {future.meta.call_args.bucket}")
print(f"Key: {future.meta.call_args.key}")
finally:
transfer_manager.shutdown()import time
from s3transfer.manager import TransferManager
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
filename = '/tmp/large_file.dat'
file_size = os.path.getsize(filename)
with open(filename, 'rb') as f:
future = transfer_manager.upload(f, 'my-bucket', 'large_file.dat')
# Provide size information for progress tracking
future.meta.provide_transfer_size(file_size)
# Monitor progress
while not future.done():
print(f"Transfer ID: {future.meta.transfer_id}")
print(f"Size: {future.meta.size} bytes")
print(f"Status: In progress...")
time.sleep(1)
# Get final result
result = future.result()
print("Upload completed!")
# Check if ETag is available
if future.meta.etag:
print(f"Object ETag: {future.meta.etag}")
finally:
transfer_manager.shutdown()from s3transfer.manager import TransferManager
import concurrent.futures
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
# Start multiple transfers
upload_futures = []
files = ['/tmp/file1.txt', '/tmp/file2.txt', '/tmp/file3.txt']
for filename in files:
with open(filename, 'rb') as f:
future = transfer_manager.upload(f, 'my-bucket', os.path.basename(filename))
upload_futures.append(future)
# Wait for all to complete
print(f"Started {len(upload_futures)} uploads...")
completed = 0
while completed < len(upload_futures):
for i, future in enumerate(upload_futures):
if future.done() and i not in processed:
try:
result = future.result()
print(f"Completed: {future.meta.call_args.key}")
completed += 1
except Exception as e:
print(f"Failed: {future.meta.call_args.key} - {e}")
completed += 1
time.sleep(0.1)
print("All transfers completed!")
finally:
transfer_manager.shutdown()import time
import threading
from s3transfer.manager import TransferManager
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
# Start a large transfer
with open('/tmp/very_large_file.dat', 'rb') as f:
future = transfer_manager.upload(f, 'my-bucket', 'very_large_file.dat')
# Cancel after 5 seconds (example)
def cancel_transfer():
time.sleep(5)
print("Cancelling transfer...")
success = future.cancel()
print(f"Cancellation {'successful' if success else 'failed'}")
cancel_thread = threading.Thread(target=cancel_transfer)
cancel_thread.start()
try:
# This will raise an exception if cancelled
result = future.result()
print("Transfer completed successfully!")
except Exception as e:
print(f"Transfer failed or was cancelled: {e}")
cancel_thread.join()
finally:
transfer_manager.shutdown()from s3transfer.manager import TransferManager
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
with open('/tmp/document.pdf', 'rb') as f:
future = transfer_manager.upload(f, 'my-bucket', 'documents/document.pdf')
# Add custom context information
future.meta.user_context['upload_time'] = time.time()
future.meta.user_context['user_id'] = 'user123'
future.meta.user_context['department'] = 'engineering'
# Provide additional metadata
file_size = os.path.getsize('/tmp/document.pdf')
future.meta.provide_transfer_size(file_size)
# Wait for completion
result = future.result()
# Access custom context
upload_time = future.meta.user_context['upload_time']
user_id = future.meta.user_context['user_id']
print(f"Upload completed for user {user_id} at {upload_time}")
print(f"File size: {future.meta.size} bytes")
finally:
transfer_manager.shutdown()from s3transfer.manager import TransferManager
from s3transfer.exceptions import S3UploadFailedError, TransferNotDoneError
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
# Start multiple transfers with error handling
futures = []
files = ['/tmp/file1.txt', '/tmp/nonexistent.txt', '/tmp/file3.txt']
for filename in files:
try:
with open(filename, 'rb') as f:
future = transfer_manager.upload(f, 'my-bucket', os.path.basename(filename))
futures.append((future, filename))
except FileNotFoundError:
print(f"File not found: {filename}")
continue
# Process results
for future, filename in futures:
try:
result = future.result()
print(f"✓ Successfully uploaded: {filename}")
except S3UploadFailedError as e:
print(f"✗ Upload failed for {filename}: {e}")
except TransferNotDoneError as e:
print(f"✗ Transfer not complete for {filename}: {e}")
except Exception as e:
print(f"✗ Unexpected error for {filename}: {e}")
finally:
transfer_manager.shutdown()from s3transfer.manager import TransferManager
from s3transfer.futures import TransferCoordinator
# Example of accessing the underlying coordinator (advanced usage)
client = boto3.client('s3')
transfer_manager = TransferManager(client)
try:
with open('/tmp/file.txt', 'rb') as f:
future = transfer_manager.upload(f, 'my-bucket', 'file.txt')
# Access the underlying coordinator (advanced usage)
coordinator = future._coordinator # Note: private attribute
# Add custom done callback
def on_transfer_complete():
print(f"Transfer {future.meta.transfer_id} completed!")
print(f"Final status: {coordinator.status}")
coordinator.add_done_callback(on_transfer_complete)
# Add failure cleanup
def cleanup_on_failure():
print("Cleaning up after transfer failure...")
coordinator.add_failure_cleanup(cleanup_on_failure)
# Wait for completion
result = future.result()
finally:
transfer_manager.shutdown()# Check current state
if not future.done():
print("Transfer is running or queued")
else:
try:
result = future.result()
print("Transfer completed successfully")
except Exception as e:
print(f"Transfer failed: {e}")future.result() to ensure exceptions are raisedfuture.result() in try/catch blocksfuture.done() for non-blocking status checksS3UploadFailedError, TransferNotDoneError, etc. specificallyInstall with Tessl CLI
npx tessl i tessl/pypi-s3transfer