An Amazon S3 Transfer Manager that provides high-level abstractions for efficient uploads/downloads with multipart transfers, progress callbacks, and retry logic.
—
File handling utilities including chunk readers, progress streams, OS operations, and callback support for monitoring transfer progress and managing file operations efficiently.
Enhanced file chunk reader that provides progress callbacks, transfer state management, and efficient reading of file segments.
class ReadFileChunk:
"""
File-like object for reading chunks of files with progress callbacks and transfer state management.
Args:
fileobj: File object to read from
start_byte (int): Starting position in file
chunk_size (int): Maximum chunk size to read
full_file_size (int): Total file size
callback (callable, optional): Progress callback function(bytes_read)
enable_callback (bool): Whether to enable callbacks initially
"""
def __init__(
self,
fileobj,
start_byte: int,
chunk_size: int,
full_file_size: int,
callback=None,
enable_callback: bool = True
): ...
@classmethod
def from_filename(
cls,
filename: str,
start_byte: int,
chunk_size: int,
callback=None,
enable_callback: bool = True
):
"""
Create ReadFileChunk from filename.
Args:
filename (str): Path to file
start_byte (int): Starting position in file
chunk_size (int): Maximum chunk size to read
callback (callable, optional): Progress callback function
enable_callback (bool): Whether to enable callbacks initially
Returns:
ReadFileChunk: New instance
"""
def read(self, amount=None) -> bytes:
"""
Read data from chunk.
Args:
amount (int, optional): Number of bytes to read (default: all remaining)
Returns:
bytes: Data read from chunk
"""
def seek(self, where: int):
"""
Seek to position within chunk.
Args:
where (int): Position to seek to (relative to chunk start)
"""
def tell(self) -> int:
"""
Get current position within chunk.
Returns:
int: Current position relative to chunk start
"""
def close(self):
"""Close the underlying file object."""
def signal_transferring(self):
"""Signal that transfer is currently active."""
def signal_not_transferring(self):
"""Signal that transfer is not currently active."""
def enable_callback(self):
"""Enable progress callbacks."""
def disable_callback(self):
"""Disable progress callbacks."""
def __len__(self) -> int:
"""Return the size of this chunk."""
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, *args, **kwargs):
"""Context manager exit."""
self.close()Wrapper for read-only streams that adds progress callback functionality for monitoring data consumption.
class StreamReaderProgress:
"""
Wrapper for read-only streams that adds progress callbacks.
Args:
stream: Stream to wrap (must support read())
callback (callable, optional): Progress callback function(bytes_read)
"""
def __init__(self, stream, callback=None): ...
def read(self, *args, **kwargs) -> bytes:
"""
Read from stream with progress tracking.
Args:
*args: Arguments passed to underlying stream.read()
**kwargs: Keyword arguments passed to underlying stream.read()
Returns:
bytes: Data read from stream
"""File-like object that defers opening the actual file until first access, useful for preparing file operations without immediate resource consumption.
class DeferredOpenFile:
"""
File-like object that defers opening until first access.
Args:
filename (str): Path to file
mode (str): File open mode
open_func (callable, optional): Function to use for opening file
"""
def __init__(self, filename: str, mode: str, open_func=None): ...
def read(self, amount=None) -> bytes:
"""
Read from file, opening if necessary.
Args:
amount (int, optional): Number of bytes to read
Returns:
bytes: Data read from file
"""
def write(self, data: bytes) -> int:
"""
Write to file, opening if necessary.
Args:
data (bytes): Data to write
Returns:
int: Number of bytes written
"""
def seek(self, where: int, whence: int = 0):
"""
Seek to position in file, opening if necessary.
Args:
where (int): Position to seek to
whence (int): Reference point for position
"""
def tell(self) -> int:
"""
Get current position in file, opening if necessary.
Returns:
int: Current file position
"""
def close(self):
"""Close file if open."""
@property
def name(self) -> str:
"""
Get filename.
Returns:
str: Filename
"""Enhanced OS utility functions providing file operations, size queries, and file chunk reader creation with comprehensive error handling.
class OSUtils:
"""
Enhanced OS utility functions for file operations.
"""
def get_file_size(self, filename: str) -> int:
"""
Get file size in bytes.
Args:
filename (str): Path to file
Returns:
int: File size in bytes
Raises:
OSError: If file cannot be accessed
"""
def open_file_chunk_reader(self, filename: str, start_byte: int, size: int, callbacks):
"""
Open a file chunk reader with progress callback.
Args:
filename (str): Path to file
start_byte (int): Starting position in file
size (int): Size of chunk to read
callbacks: Progress callback functions (list or single callback)
Returns:
ReadFileChunk: File chunk reader instance
"""
def open_file_chunk_reader_from_fileobj(self, fileobj, chunk_size, full_file_size, callbacks, close_callbacks=None):
"""
Open a file chunk reader from existing file object.
Args:
fileobj: File object to read from
chunk_size: Size of chunk to read
full_file_size: Full size of the file
callbacks: Progress callback functions (list or single callback)
close_callbacks: Callbacks to execute when closing (optional)
Returns:
ReadFileChunk: File chunk reader instance
"""
def open(self, filename: str, mode: str):
"""
Open a file.
Args:
filename (str): Path to file
mode (str): File open mode
Returns:
File object
"""
def remove_file(self, filename: str):
"""
Remove a file (no-op if doesn't exist).
Args:
filename (str): Path to file to remove
"""
def rename_file(self, current_filename: str, new_filename: str):
"""
Rename a file.
Args:
current_filename (str): Current filename
new_filename (str): New filename
"""
def is_special_file(self, filename: str) -> bool:
"""
Check if file is a special file (device, pipe, etc.).
Args:
filename (str): Path to file
Returns:
bool: True if file is special, False otherwise
"""
def get_temp_filename(self, filename: str) -> str:
"""
Get a temporary filename based on the given filename.
Args:
filename (str): Base filename
Returns:
str: Temporary filename
"""
def allocate(self, filename: str, size: int):
"""
Allocate space for a file.
Args:
filename (str): Path to file
size (int): Size in bytes to allocate
"""Additional utility classes for managing callbacks, function containers, and semaphores.
class CallArgs:
"""
Records and stores call arguments as attributes.
Args:
**kwargs: Keyword arguments to store as attributes
"""
def __init__(self, **kwargs): ...
class FunctionContainer:
"""
Container for storing function with args and kwargs.
Args:
function: Function to store
*args: Positional arguments for function
**kwargs: Keyword arguments for function
"""
def __init__(self, function, *args, **kwargs): ...
class CountCallbackInvoker:
"""
Invokes callback when internal count reaches zero.
Args:
callback (callable): Function to call when count reaches zero
"""
def __init__(self, callback): ...
def increment(self):
"""Increment the counter."""
def decrement(self):
"""Decrement the counter, calling callback if it reaches zero."""
def finalize(self):
"""Force callback invocation regardless of count."""
@property
def current_count(self) -> int:
"""
Current count value.
Returns:
int: Current count
"""
class TaskSemaphore:
"""
Semaphore for coordinating task execution with tagging support.
Args:
capacity (int): Maximum number of permits
"""
def __init__(self, capacity: int): ...
def acquire(self, task_tag, blocking: bool = True):
"""
Acquire a permit.
Args:
task_tag: Tag identifying the task type
blocking (bool): Whether to block if no permits available
Returns:
Token: Acquire token for later release
"""
def release(self, task_tag, acquire_token):
"""
Release a permit.
Args:
task_tag: Tag identifying the task type
acquire_token: Token from acquire() call
"""
class ChunksizeAdjuster:
"""
Adjusts chunk sizes to comply with S3 multipart upload limits.
"""
def adjust_chunksize(self, current_chunksize: int, file_size: int, max_parts: int = 10000) -> int:
"""
Adjust chunk size to ensure number of parts doesn't exceed limit.
Args:
current_chunksize (int): Current chunk size
file_size (int): Total file size
max_parts (int): Maximum number of parts allowed
Returns:
int: Adjusted chunk size
"""from s3transfer.utils import ReadFileChunk
def progress_callback(bytes_read):
print(f"Read {bytes_read} bytes")
# Read a specific chunk of a file
with ReadFileChunk.from_filename(
'/tmp/large_file.dat',
start_byte=1024, # Start at byte 1024
chunk_size=8 * 1024 * 1024, # Read up to 8MB
callback=progress_callback
) as chunk:
data = chunk.read(1024) # Read 1KB
print(f"Current position: {chunk.tell()}")
chunk.seek(2048) # Seek to byte 2048 within chunk
more_data = chunk.read() # Read remaining data in chunkfrom s3transfer.utils import StreamReaderProgress
import boto3
def download_progress(bytes_read):
print(f"Downloaded {bytes_read} bytes")
# Download with progress tracking
client = boto3.client('s3')
response = client.get_object(Bucket='my-bucket', Key='large-file.dat')
# Wrap the streaming body with progress tracking
progress_stream = StreamReaderProgress(response['Body'], download_progress)
# Read in chunks
with open('/tmp/downloaded.dat', 'wb') as f:
while True:
chunk = progress_stream.read(8192) # 8KB chunks
if not chunk:
break
f.write(chunk)from s3transfer.utils import DeferredOpenFile
# Create deferred file (doesn't open yet)
deferred_file = DeferredOpenFile('/tmp/output.txt', 'w')
# File is opened only when first accessed
deferred_file.write(b'Hello, world!') # File opened here
deferred_file.write(b'More data') # File already open
print(f"Filename: {deferred_file.name}")
deferred_file.close()from s3transfer.utils import OSUtils
import os
osutil = OSUtils()
# File size operations
filename = '/tmp/test_file.dat'
file_size = osutil.get_file_size(filename)
print(f"File size: {file_size} bytes")
# Check if file is special (device, pipe, etc.)
if osutil.is_special_file(filename):
print("File is a special file")
else:
print("File is a regular file")
# Get temporary filename
temp_filename = osutil.get_temp_filename(filename)
print(f"Temporary filename: {temp_filename}")
# Safe file operations
osutil.remove_file('/tmp/might_not_exist.txt') # No error if doesn't exist
# Allocate space for large file (on supported filesystems)
try:
osutil.allocate('/tmp/large_file.dat', 1024 * 1024 * 1024) # 1GB
print("Space allocated successfully")
except OSError as e:
print(f"Space allocation failed: {e}")from s3transfer.utils import ChunksizeAdjuster
adjuster = ChunksizeAdjuster()
# Adjust chunk size for large file to stay within S3 limits
file_size = 5 * 1024 * 1024 * 1024 # 5GB
current_chunk_size = 8 * 1024 * 1024 # 8MB
adjusted_size = adjuster.adjust_chunksize(
current_chunksize=current_chunk_size,
file_size=file_size,
max_parts=10000 # S3 limit
)
print(f"Original chunk size: {current_chunk_size}")
print(f"Adjusted chunk size: {adjusted_size}")
print(f"Number of parts: {file_size // adjusted_size}")from s3transfer.utils import CountCallbackInvoker
def completion_callback():
print("All operations completed!")
# Create callback invoker that triggers when count reaches zero
invoker = CountCallbackInvoker(completion_callback)
# Simulate multiple operations
operations = ['upload1', 'upload2', 'upload3']
# Increment for each operation
for op in operations:
invoker.increment()
print(f"Started operation: {op}")
print(f"Current count: {invoker.current_count}")
# Decrement as operations complete
for op in operations:
invoker.decrement()
print(f"Completed operation: {op}, remaining: {invoker.current_count}")
# Callback is called when count reaches zerofrom s3transfer.utils import TaskSemaphore
# Create semaphore for limiting concurrent operations
semaphore = TaskSemaphore(capacity=5) # Max 5 concurrent operations
def perform_operation(task_id):
# Acquire permit
token = semaphore.acquire('upload_task')
try:
print(f"Performing operation {task_id}")
# Simulate work
time.sleep(1)
print(f"Completed operation {task_id}")
finally:
# Always release permit
semaphore.release('upload_task', token)
# Start multiple operations (only 5 will run concurrently)
import threading
threads = []
for i in range(10):
thread = threading.Thread(target=perform_operation, args=(i,))
threads.append(thread)
thread.start()
# Wait for all to complete
for thread in threads:
thread.join()from s3transfer.utils import ReadFileChunk
def transfer_progress(bytes_read):
print(f"Transfer progress: {bytes_read} bytes")
filename = '/tmp/large_upload.dat'
chunk_size = 64 * 1024 * 1024 # 64MB chunks
file_size = os.path.getsize(filename)
# Read file in chunks for multipart upload
chunks_processed = 0
start_byte = 0
while start_byte < file_size:
with ReadFileChunk.from_filename(
filename,
start_byte=start_byte,
chunk_size=chunk_size,
callback=transfer_progress
) as chunk:
# Signal that transfer is active
chunk.signal_transferring()
try:
# Process chunk (simulate upload)
data = chunk.read()
print(f"Processing chunk {chunks_processed + 1}, size: {len(data)}")
# Simulate upload process
bytes_uploaded = 0
while bytes_uploaded < len(data):
# Upload in smaller increments
increment = min(8192, len(data) - bytes_uploaded)
bytes_uploaded += increment
# Progress is automatically reported via callback
finally:
# Signal transfer is no longer active
chunk.signal_not_transferring()
chunks_processed += 1
start_byte += chunk_size
print(f"Processed {chunks_processed} chunks total")def get_callbacks(subscribers, callback_type: str) -> List[callable]:
"""
Extract callbacks of a specific type from subscriber objects.
Args:
subscribers: List of subscriber objects
callback_type (str): Type of callback to extract
Returns:
list: List of callback functions
"""
def invoke_progress_callbacks(callbacks: List[callable], bytes_transferred: int):
"""
Invoke progress callbacks with bytes transferred.
Args:
callbacks: List of callback functions
bytes_transferred (int): Number of bytes transferred
"""
def calculate_num_parts(size: int, part_size: int) -> int:
"""
Calculate number of parts needed for multipart upload.
Args:
size (int): Total size in bytes
part_size (int): Size per part in bytes
Returns:
int: Number of parts needed
"""
def calculate_range_parameter(start_range: int, end_range: int) -> str:
"""
Calculate HTTP Range parameter for partial downloads.
Args:
start_range (int): Start byte position
end_range (int): End byte position
Returns:
str: Range parameter string (e.g., 'bytes=0-1023')
"""
def get_filtered_dict(original_dict: dict, allowed_keys: List[str]) -> dict:
"""
Filter dictionary to only include allowed keys.
Args:
original_dict (dict): Original dictionary
allowed_keys (list): List of allowed keys
Returns:
dict: Filtered dictionary
"""
def random_file_extension(num_digits: int = 8) -> str:
"""
Generate random file extension.
Args:
num_digits (int): Number of digits in extension
Returns:
str: Random file extension
"""with statements for ReadFileChunksignal_transferring() and signal_not_transferring()is_special_file() before operationsremove_file() which handles missing filesget_temp_filename() for atomic operationsInstall with Tessl CLI
npx tessl i tessl/pypi-s3transfer