CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-s3transfer

An Amazon S3 Transfer Manager that provides high-level abstractions for efficient uploads/downloads with multipart transfers, progress callbacks, and retry logic.

Pending
Overview
Eval results
Files

file-utilities.mddocs/

File Utilities and Progress Tracking

File handling utilities including chunk readers, progress streams, OS operations, and callback support for monitoring transfer progress and managing file operations efficiently.

Capabilities

ReadFileChunk

Enhanced file chunk reader that provides progress callbacks, transfer state management, and efficient reading of file segments.

class ReadFileChunk:
    """
    File-like object for reading chunks of files with progress callbacks and transfer state management.
    
    Args:
        fileobj: File object to read from
        start_byte (int): Starting position in file
        chunk_size (int): Maximum chunk size to read
        full_file_size (int): Total file size
        callback (callable, optional): Progress callback function(bytes_read)
        enable_callback (bool): Whether to enable callbacks initially
    """
    def __init__(
        self,
        fileobj,
        start_byte: int,
        chunk_size: int,
        full_file_size: int,
        callback=None,
        enable_callback: bool = True
    ): ...
    
    @classmethod
    def from_filename(
        cls,
        filename: str,
        start_byte: int,
        chunk_size: int,
        callback=None,
        enable_callback: bool = True
    ):
        """
        Create ReadFileChunk from filename.
        
        Args:
            filename (str): Path to file
            start_byte (int): Starting position in file
            chunk_size (int): Maximum chunk size to read
            callback (callable, optional): Progress callback function
            enable_callback (bool): Whether to enable callbacks initially
        
        Returns:
            ReadFileChunk: New instance
        """
    
    def read(self, amount=None) -> bytes:
        """
        Read data from chunk.
        
        Args:
            amount (int, optional): Number of bytes to read (default: all remaining)
        
        Returns:
            bytes: Data read from chunk
        """
    
    def seek(self, where: int):
        """
        Seek to position within chunk.
        
        Args:
            where (int): Position to seek to (relative to chunk start)
        """
    
    def tell(self) -> int:
        """
        Get current position within chunk.
        
        Returns:
            int: Current position relative to chunk start
        """
    
    def close(self):
        """Close the underlying file object."""
    
    def signal_transferring(self):
        """Signal that transfer is currently active."""
    
    def signal_not_transferring(self):
        """Signal that transfer is not currently active."""
    
    def enable_callback(self):
        """Enable progress callbacks."""
    
    def disable_callback(self):
        """Disable progress callbacks."""
    
    def __len__(self) -> int:
        """Return the size of this chunk."""
    
    def __enter__(self):
        """Context manager entry."""
        return self
    
    def __exit__(self, *args, **kwargs):
        """Context manager exit."""
        self.close()

StreamReaderProgress

Wrapper for read-only streams that adds progress callback functionality for monitoring data consumption.

class StreamReaderProgress:
    """
    Wrapper for read-only streams that adds progress callbacks.
    
    Args:
        stream: Stream to wrap (must support read())
        callback (callable, optional): Progress callback function(bytes_read)
    """
    def __init__(self, stream, callback=None): ...
    
    def read(self, *args, **kwargs) -> bytes:
        """
        Read from stream with progress tracking.
        
        Args:
            *args: Arguments passed to underlying stream.read()
            **kwargs: Keyword arguments passed to underlying stream.read()
        
        Returns:
            bytes: Data read from stream
        """

DeferredOpenFile

File-like object that defers opening the actual file until first access, useful for preparing file operations without immediate resource consumption.

class DeferredOpenFile:
    """
    File-like object that defers opening until first access.
    
    Args:
        filename (str): Path to file
        mode (str): File open mode
        open_func (callable, optional): Function to use for opening file
    """
    def __init__(self, filename: str, mode: str, open_func=None): ...
    
    def read(self, amount=None) -> bytes:
        """
        Read from file, opening if necessary.
        
        Args:
            amount (int, optional): Number of bytes to read
        
        Returns:
            bytes: Data read from file
        """
    
    def write(self, data: bytes) -> int:
        """
        Write to file, opening if necessary.
        
        Args:
            data (bytes): Data to write
        
        Returns:
            int: Number of bytes written
        """
    
    def seek(self, where: int, whence: int = 0):
        """
        Seek to position in file, opening if necessary.
        
        Args:
            where (int): Position to seek to
            whence (int): Reference point for position
        """
    
    def tell(self) -> int:
        """
        Get current position in file, opening if necessary.
        
        Returns:
            int: Current file position
        """
    
    def close(self):
        """Close file if open."""
    
    @property
    def name(self) -> str:
        """
        Get filename.
        
        Returns:
            str: Filename
        """

OSUtils

Enhanced OS utility functions providing file operations, size queries, and file chunk reader creation with comprehensive error handling.

class OSUtils:
    """
    Enhanced OS utility functions for file operations.
    """
    def get_file_size(self, filename: str) -> int:
        """
        Get file size in bytes.
        
        Args:
            filename (str): Path to file
        
        Returns:
            int: File size in bytes
        
        Raises:
            OSError: If file cannot be accessed
        """
    
    def open_file_chunk_reader(self, filename: str, start_byte: int, size: int, callbacks):
        """
        Open a file chunk reader with progress callback.
        
        Args:
            filename (str): Path to file
            start_byte (int): Starting position in file
            size (int): Size of chunk to read
            callbacks: Progress callback functions (list or single callback)
        
        Returns:
            ReadFileChunk: File chunk reader instance
        """
    
    def open_file_chunk_reader_from_fileobj(self, fileobj, chunk_size, full_file_size, callbacks, close_callbacks=None):
        """
        Open a file chunk reader from existing file object.
        
        Args:
            fileobj: File object to read from
            chunk_size: Size of chunk to read
            full_file_size: Full size of the file
            callbacks: Progress callback functions (list or single callback)
            close_callbacks: Callbacks to execute when closing (optional)
        
        Returns:
            ReadFileChunk: File chunk reader instance
        """
    
    def open(self, filename: str, mode: str):
        """
        Open a file.
        
        Args:
            filename (str): Path to file
            mode (str): File open mode
        
        Returns:
            File object
        """
    
    def remove_file(self, filename: str):
        """
        Remove a file (no-op if doesn't exist).
        
        Args:
            filename (str): Path to file to remove
        """
    
    def rename_file(self, current_filename: str, new_filename: str):
        """
        Rename a file.
        
        Args:
            current_filename (str): Current filename
            new_filename (str): New filename
        """
    
    def is_special_file(self, filename: str) -> bool:
        """
        Check if file is a special file (device, pipe, etc.).
        
        Args:
            filename (str): Path to file
        
        Returns:
            bool: True if file is special, False otherwise
        """
    
    def get_temp_filename(self, filename: str) -> str:
        """
        Get a temporary filename based on the given filename.
        
        Args:
            filename (str): Base filename
        
        Returns:
            str: Temporary filename
        """
    
    def allocate(self, filename: str, size: int):
        """
        Allocate space for a file.
        
        Args:
            filename (str): Path to file
            size (int): Size in bytes to allocate
        """

Utility Classes

Additional utility classes for managing callbacks, function containers, and semaphores.

class CallArgs:
    """
    Records and stores call arguments as attributes.
    
    Args:
        **kwargs: Keyword arguments to store as attributes
    """
    def __init__(self, **kwargs): ...

class FunctionContainer:
    """
    Container for storing function with args and kwargs.
    
    Args:
        function: Function to store
        *args: Positional arguments for function
        **kwargs: Keyword arguments for function
    """
    def __init__(self, function, *args, **kwargs): ...

class CountCallbackInvoker:
    """
    Invokes callback when internal count reaches zero.
    
    Args:
        callback (callable): Function to call when count reaches zero
    """
    def __init__(self, callback): ...
    
    def increment(self):
        """Increment the counter."""
    
    def decrement(self):
        """Decrement the counter, calling callback if it reaches zero."""
    
    def finalize(self):
        """Force callback invocation regardless of count."""
    
    @property
    def current_count(self) -> int:
        """
        Current count value.
        
        Returns:
            int: Current count
        """

class TaskSemaphore:
    """
    Semaphore for coordinating task execution with tagging support.
    
    Args:
        capacity (int): Maximum number of permits
    """
    def __init__(self, capacity: int): ...
    
    def acquire(self, task_tag, blocking: bool = True):
        """
        Acquire a permit.
        
        Args:
            task_tag: Tag identifying the task type
            blocking (bool): Whether to block if no permits available
        
        Returns:
            Token: Acquire token for later release
        """
    
    def release(self, task_tag, acquire_token):
        """
        Release a permit.
        
        Args:
            task_tag: Tag identifying the task type
            acquire_token: Token from acquire() call
        """

class ChunksizeAdjuster:
    """
    Adjusts chunk sizes to comply with S3 multipart upload limits.
    """
    def adjust_chunksize(self, current_chunksize: int, file_size: int, max_parts: int = 10000) -> int:
        """
        Adjust chunk size to ensure number of parts doesn't exceed limit.
        
        Args:
            current_chunksize (int): Current chunk size
            file_size (int): Total file size
            max_parts (int): Maximum number of parts allowed
        
        Returns:
            int: Adjusted chunk size
        """

Usage Examples

Basic File Chunk Reading

from s3transfer.utils import ReadFileChunk

def progress_callback(bytes_read):
    print(f"Read {bytes_read} bytes")

# Read a specific chunk of a file
with ReadFileChunk.from_filename(
    '/tmp/large_file.dat',
    start_byte=1024,           # Start at byte 1024
    chunk_size=8 * 1024 * 1024, # Read up to 8MB
    callback=progress_callback
) as chunk:
    data = chunk.read(1024)    # Read 1KB
    print(f"Current position: {chunk.tell()}")
    
    chunk.seek(2048)           # Seek to byte 2048 within chunk
    more_data = chunk.read()   # Read remaining data in chunk

Progress Tracking for Stream Reading

from s3transfer.utils import StreamReaderProgress
import boto3

def download_progress(bytes_read):
    print(f"Downloaded {bytes_read} bytes")

# Download with progress tracking
client = boto3.client('s3')
response = client.get_object(Bucket='my-bucket', Key='large-file.dat')

# Wrap the streaming body with progress tracking
progress_stream = StreamReaderProgress(response['Body'], download_progress)

# Read in chunks
with open('/tmp/downloaded.dat', 'wb') as f:
    while True:
        chunk = progress_stream.read(8192)  # 8KB chunks
        if not chunk:
            break
        f.write(chunk)

Deferred File Operations

from s3transfer.utils import DeferredOpenFile

# Create deferred file (doesn't open yet)
deferred_file = DeferredOpenFile('/tmp/output.txt', 'w')

# File is opened only when first accessed
deferred_file.write(b'Hello, world!')  # File opened here
deferred_file.write(b'More data')      # File already open

print(f"Filename: {deferred_file.name}")
deferred_file.close()

Advanced OS Utilities

from s3transfer.utils import OSUtils
import os

osutil = OSUtils()

# File size operations
filename = '/tmp/test_file.dat'
file_size = osutil.get_file_size(filename)
print(f"File size: {file_size} bytes")

# Check if file is special (device, pipe, etc.)
if osutil.is_special_file(filename):
    print("File is a special file")
else:
    print("File is a regular file")

# Get temporary filename
temp_filename = osutil.get_temp_filename(filename)
print(f"Temporary filename: {temp_filename}")

# Safe file operations
osutil.remove_file('/tmp/might_not_exist.txt')  # No error if doesn't exist

# Allocate space for large file (on supported filesystems)
try:
    osutil.allocate('/tmp/large_file.dat', 1024 * 1024 * 1024)  # 1GB
    print("Space allocated successfully")
except OSError as e:
    print(f"Space allocation failed: {e}")

Chunk Size Adjustment

from s3transfer.utils import ChunksizeAdjuster

adjuster = ChunksizeAdjuster()

# Adjust chunk size for large file to stay within S3 limits
file_size = 5 * 1024 * 1024 * 1024  # 5GB
current_chunk_size = 8 * 1024 * 1024  # 8MB

adjusted_size = adjuster.adjust_chunksize(
    current_chunksize=current_chunk_size,
    file_size=file_size,
    max_parts=10000  # S3 limit
)

print(f"Original chunk size: {current_chunk_size}")
print(f"Adjusted chunk size: {adjusted_size}")
print(f"Number of parts: {file_size // adjusted_size}")

Callback Management

from s3transfer.utils import CountCallbackInvoker

def completion_callback():
    print("All operations completed!")

# Create callback invoker that triggers when count reaches zero
invoker = CountCallbackInvoker(completion_callback)

# Simulate multiple operations
operations = ['upload1', 'upload2', 'upload3']

# Increment for each operation
for op in operations:
    invoker.increment()
    print(f"Started operation: {op}")

print(f"Current count: {invoker.current_count}")

# Decrement as operations complete
for op in operations:
    invoker.decrement()
    print(f"Completed operation: {op}, remaining: {invoker.current_count}")
    # Callback is called when count reaches zero

Task Coordination with Semaphores

from s3transfer.utils import TaskSemaphore

# Create semaphore for limiting concurrent operations
semaphore = TaskSemaphore(capacity=5)  # Max 5 concurrent operations

def perform_operation(task_id):
    # Acquire permit
    token = semaphore.acquire('upload_task')
    try:
        print(f"Performing operation {task_id}")
        # Simulate work
        time.sleep(1)
        print(f"Completed operation {task_id}")
    finally:
        # Always release permit
        semaphore.release('upload_task', token)

# Start multiple operations (only 5 will run concurrently)
import threading

threads = []
for i in range(10):
    thread = threading.Thread(target=perform_operation, args=(i,))
    threads.append(thread)
    thread.start()

# Wait for all to complete
for thread in threads:
    thread.join()

File Chunk Reading with Transfer State

from s3transfer.utils import ReadFileChunk

def transfer_progress(bytes_read):
    print(f"Transfer progress: {bytes_read} bytes")

filename = '/tmp/large_upload.dat'
chunk_size = 64 * 1024 * 1024  # 64MB chunks
file_size = os.path.getsize(filename)

# Read file in chunks for multipart upload
chunks_processed = 0
start_byte = 0

while start_byte < file_size:
    with ReadFileChunk.from_filename(
        filename,
        start_byte=start_byte,
        chunk_size=chunk_size,
        callback=transfer_progress
    ) as chunk:
        # Signal that transfer is active
        chunk.signal_transferring()
        
        try:
            # Process chunk (simulate upload)
            data = chunk.read()
            print(f"Processing chunk {chunks_processed + 1}, size: {len(data)}")
            
            # Simulate upload process
            bytes_uploaded = 0
            while bytes_uploaded < len(data):
                # Upload in smaller increments
                increment = min(8192, len(data) - bytes_uploaded)
                bytes_uploaded += increment
                # Progress is automatically reported via callback
            
        finally:
            # Signal transfer is no longer active
            chunk.signal_not_transferring()
    
    chunks_processed += 1
    start_byte += chunk_size

print(f"Processed {chunks_processed} chunks total")

Utility Functions

Progress and Callback Utilities

def get_callbacks(subscribers, callback_type: str) -> List[callable]:
    """
    Extract callbacks of a specific type from subscriber objects.
    
    Args:
        subscribers: List of subscriber objects
        callback_type (str): Type of callback to extract
    
    Returns:
        list: List of callback functions
    """

def invoke_progress_callbacks(callbacks: List[callable], bytes_transferred: int):
    """
    Invoke progress callbacks with bytes transferred.
    
    Args:
        callbacks: List of callback functions
        bytes_transferred (int): Number of bytes transferred
    """

def calculate_num_parts(size: int, part_size: int) -> int:
    """
    Calculate number of parts needed for multipart upload.
    
    Args:
        size (int): Total size in bytes
        part_size (int): Size per part in bytes
    
    Returns:
        int: Number of parts needed
    """

def calculate_range_parameter(start_range: int, end_range: int) -> str:
    """
    Calculate HTTP Range parameter for partial downloads.
    
    Args:
        start_range (int): Start byte position
        end_range (int): End byte position
    
    Returns:
        str: Range parameter string (e.g., 'bytes=0-1023')
    """

def get_filtered_dict(original_dict: dict, allowed_keys: List[str]) -> dict:
    """
    Filter dictionary to only include allowed keys.
    
    Args:
        original_dict (dict): Original dictionary
        allowed_keys (list): List of allowed keys
    
    Returns:
        dict: Filtered dictionary
    """

def random_file_extension(num_digits: int = 8) -> str:
    """
    Generate random file extension.
    
    Args:
        num_digits (int): Number of digits in extension
    
    Returns:
        str: Random file extension
    """

Best Practices

File Chunk Reading

  1. Use context managers: Always use with statements for ReadFileChunk
  2. Handle large files: Use appropriate chunk sizes for memory management
  3. Monitor progress: Implement progress callbacks for user feedback
  4. Signal transfer state: Use signal_transferring() and signal_not_transferring()

Progress Tracking

  1. Provide meaningful feedback: Use progress callbacks to inform users
  2. Handle zero-byte transfers: Check for empty files or streams
  3. Aggregate progress: Combine progress from multiple sources when needed
  4. Debounce callbacks: Avoid excessive callback frequency for performance

Resource Management

  1. Close files properly: Use context managers or explicit close() calls
  2. Handle exceptions: Ensure cleanup even when errors occur
  3. Limit memory usage: Don't read entire large files into memory
  4. Validate file operations: Check file existence and permissions

OS Utilities

  1. Handle cross-platform differences: Use OSUtils for portable file operations
  2. Check special files: Use is_special_file() before operations
  3. Safe file removal: Use remove_file() which handles missing files
  4. Temporary files: Use get_temp_filename() for atomic operations

Install with Tessl CLI

npx tessl i tessl/pypi-s3transfer

docs

bandwidth-management.md

configuration.md

crt-support.md

exception-handling.md

file-utilities.md

futures-coordination.md

index.md

legacy-transfer.md

process-pool-downloads.md

subscribers-callbacks.md

transfer-manager.md

tile.json