CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fsspec

Unified pythonic interface for diverse file systems and storage backends

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

callbacks.mddocs/

Progress Callbacks

Extensible callback system for monitoring file transfer progress, supporting both built-in progress indicators and custom callback implementations. Callbacks provide real-time feedback during long-running filesystem operations like file uploads, downloads, and bulk transfers.

Capabilities

Base Callback Class

Foundation class for all progress callback implementations with standard progress tracking interface.

class Callback:
    """Base class for progress callbacks."""
    
    def __init__(self, size=None, value=0, hooks=None, **kwargs):
        """
        Initialize callback.
        
        Parameters:
        - size: int, total size/count for progress tracking
        - value: int, initial progress value
        - hooks: dict, event hooks for specific progress events
        - **kwargs: additional callback-specific options
        """
    
    def __call__(self, size_or_none=None, value_or_none=None):
        """
        Update progress.
        
        Parameters:
        - size_or_none: int or None, set total size if provided
        - value_or_none: int or None, set current progress if provided
        
        Returns:
        bool, True to continue operation, False to abort
        """
    
    def __enter__(self):
        """
        Context manager entry.
        
        Returns:
        Callback, self
        """
    
    def __exit__(self, *args):
        """Context manager exit."""
    
    def set_size(self, size):
        """
        Set total size for progress tracking.
        
        Parameters:
        - size: int, total size/count
        """
    
    def absolute_update(self, value):
        """
        Set absolute progress value.
        
        Parameters:
        - value: int, current progress value
        """
    
    def relative_update(self, inc=1):
        """
        Update progress incrementally.
        
        Parameters:
        - inc: int, increment amount
        """
    
    def branched(self, path_1, path_2, **kwargs):
        """
        Create branched callback for parallel operations.
        
        Parameters:
        - path_1: str, first operation path
        - path_2: str, second operation path
        - **kwargs: additional options
        
        Returns:
        Callback, new callback instance for branched operation
        """
    
    def close(self):
        """Close callback and clean up resources."""

Built-in Callback Implementations

Ready-to-use callback implementations for common progress display needs.

class TqdmCallback(Callback):
    """Progress bar callback using tqdm library."""
    
    def __init__(self, tqdm_kwargs=None, **kwargs):
        """
        Initialize tqdm progress bar callback.
        
        Parameters:
        - tqdm_kwargs: dict, options passed to tqdm constructor
        - **kwargs: additional callback options
        """

class DotPrinterCallback(Callback):
    """Simple dot-printing progress callback."""
    
    def __init__(self, **kwargs):
        """Initialize dot printer callback."""

class NoOpCallback(Callback):
    """No-operation callback that does nothing."""
    
    def __init__(self, **kwargs):
        """Initialize no-op callback."""

Default Callback Instance

Pre-configured default callback instance for immediate use.

DEFAULT_CALLBACK: Callback
    """Default callback instance (NoOpCallback)"""

Usage Patterns

Basic Progress Monitoring

# Using default tqdm progress bar
callback = fsspec.callbacks.TqdmCallback()

# Download with progress bar
fs = fsspec.filesystem('s3')
fs.get('bucket/large-file.dat', 'local-file.dat', callback=callback)

# Upload with progress bar  
fs.put('large-local-file.dat', 'bucket/uploaded-file.dat', callback=callback)

Context Manager Usage

# Progress bar automatically closes when done
with fsspec.callbacks.TqdmCallback() as callback:
    fs.get('bucket/large-file.dat', 'local-file.dat', callback=callback)
    # Progress bar disappears after completion

Custom Progress Display

class CustomCallback(fsspec.Callback):
    def __init__(self):
        super().__init__()
        self.start_time = time.time()
    
    def __call__(self, size_or_none=None, value_or_none=None):
        if size_or_none is not None:
            self.size = size_or_none
        if value_or_none is not None:
            self.value = value_or_none
            
        if hasattr(self, 'size') and hasattr(self, 'value'):
            percent = (self.value / self.size) * 100
            elapsed = time.time() - self.start_time
            rate = self.value / elapsed if elapsed > 0 else 0
            
            print(f"\rProgress: {percent:.1f}% ({self.value}/{self.size}) "
                  f"Rate: {rate:.1f} bytes/sec", end='')
        
        return True  # Continue operation

# Use custom callback
callback = CustomCallback()
fs.get('bucket/file.dat', 'local.dat', callback=callback)

Multi-Operation Callbacks

import fsspec

# Callback for multiple file operations
callback = fsspec.callbacks.TqdmCallback()

files_to_download = [
    ('bucket/file1.dat', 'local1.dat'),
    ('bucket/file2.dat', 'local2.dat'),
    ('bucket/file3.dat', 'local3.dat'),
]

# Set total size for all files
total_size = sum(fs.size(remote) for remote, local in files_to_download)
callback.set_size(total_size)

# Download files with cumulative progress
for remote, local in files_to_download:
    fs.get(remote, local, callback=callback)

Branched Callbacks for Parallel Operations

# Main callback for overall progress
main_callback = fsspec.callbacks.TqdmCallback()

# Create branched callbacks for parallel operations
branch1 = main_callback.branched('operation1', 'operation2')
branch2 = main_callback.branched('operation1', 'operation2')

# Use in parallel operations (conceptual - actual parallelism depends on implementation)
import threading

def download_file(remote, local, callback):
    fs.get(remote, local, callback=callback)

thread1 = threading.Thread(target=download_file, 
                          args=('bucket/file1.dat', 'local1.dat', branch1))
thread2 = threading.Thread(target=download_file,
                          args=('bucket/file2.dat', 'local2.dat', branch2))

thread1.start()
thread2.start()
thread1.join()
thread2.join()

Cancellation Support

class CancellableCallback(fsspec.Callback):
    def __init__(self):
        super().__init__()
        self.cancelled = False
        
    def cancel(self):
        self.cancelled = True
        
    def __call__(self, size_or_none=None, value_or_none=None):
        # Update progress
        if size_or_none is not None:
            self.size = size_or_none
        if value_or_none is not None:
            self.value = value_or_none
            
        # Check for cancellation
        if self.cancelled:
            print("Operation cancelled by user")
            return False  # Abort operation
            
        # Display progress
        if hasattr(self, 'size') and hasattr(self, 'value'):
            percent = (self.value / self.size) * 100
            print(f"\rProgress: {percent:.1f}%", end='')
            
        return True  # Continue operation

# Usage with cancellation
callback = CancellableCallback()

# Start operation in background
import threading
def download_with_callback():
    try:
        fs.get('bucket/very-large-file.dat', 'local.dat', callback=callback)
        print("\nDownload completed")
    except Exception as e:
        print(f"\nDownload failed: {e}")

download_thread = threading.Thread(target=download_with_callback)
download_thread.start()

# Cancel after 30 seconds
time.sleep(30)
callback.cancel()
download_thread.join()

Integration with Different Operations

callback = fsspec.callbacks.TqdmCallback()

# File copy with progress
fs.copy('bucket/source.dat', 'bucket/backup.dat', callback=callback)

# Directory sync with progress
fs.get('bucket/data/', 'local-data/', recursive=True, callback=callback)

# Bulk upload with progress
fs.put('local-data/', 'bucket/uploaded-data/', recursive=True, callback=callback)

# Multiple file operations
files = fs.glob('bucket/data/*.csv')
callback.set_size(len(files))

for i, file_path in enumerate(files):
    content = fs.cat_file(file_path)
    process_data(content)
    callback.absolute_update(i + 1)

Configuring Tqdm Options

# Customize tqdm appearance and behavior
tqdm_options = {
    'unit': 'B',
    'unit_scale': True,
    'unit_divisor': 1024,
    'desc': 'Downloading',
    'ncols': 80,
    'ascii': True
}

callback = fsspec.callbacks.TqdmCallback(tqdm_kwargs=tqdm_options)

# Results in progress bar like:
# Downloading: 45%|████▌     | 450MB/1.00GB [00:30<00:25, 15.0MB/s]
fs.get('bucket/large-file.dat', 'local.dat', callback=callback)

Logging Progress Information

import logging

class LoggingCallback(fsspec.Callback):
    def __init__(self, log_interval=1024*1024):  # Log every MB
        super().__init__()
        self.log_interval = log_interval
        self.last_logged = 0
        self.logger = logging.getLogger(__name__)
        
    def __call__(self, size_or_none=None, value_or_none=None):
        if size_or_none is not None:
            self.size = size_or_none
            self.logger.info(f"Starting operation, total size: {self.size} bytes")
            
        if value_or_none is not None:
            self.value = value_or_none
            
            # Log at intervals
            if self.value - self.last_logged >= self.log_interval:
                if hasattr(self, 'size'):
                    percent = (self.value / self.size) * 100
                    self.logger.info(f"Progress: {percent:.1f}% ({self.value}/{self.size} bytes)")
                self.last_logged = self.value
                
        return True

# Use logging callback
logging.basicConfig(level=logging.INFO)
callback = LoggingCallback()
fs.get('bucket/file.dat', 'local.dat', callback=callback)

Callback Hooks for Events

def on_start(callback):
    print("Transfer started")

def on_complete(callback):
    print("Transfer completed successfully")

def on_error(callback, error):
    print(f"Transfer failed: {error}")

# Callbacks can use hooks for event handling
hooks = {
    'start': on_start,
    'complete': on_complete,
    'error': on_error
}

callback = fsspec.Callback(hooks=hooks)
# Implementation would call hooks at appropriate times

Callback Guidelines

When to Use Callbacks

  • Long-running operations: File transfers, directory syncing
  • Large files: Multi-gigabyte uploads/downloads
  • Batch operations: Processing many files
  • User interfaces: Providing feedback in GUI applications
  • Monitoring: Logging progress for automated systems

Callback Performance Considerations

  • Update frequency: Don't update too frequently (every few KB minimum)
  • UI responsiveness: Keep callback processing lightweight
  • Thread safety: Ensure callbacks are thread-safe for parallel operations
  • Resource cleanup: Always close/cleanup callbacks when done

Error Handling in Callbacks

class RobustCallback(fsspec.Callback):
    def __call__(self, size_or_none=None, value_or_none=None):
        try:
            # Update progress display
            self.update_display(size_or_none, value_or_none)
            return True
        except Exception as e:
            # Log error but don't fail the transfer
            logging.error(f"Callback error: {e}")
            return True  # Continue operation despite callback error

Install with Tessl CLI

npx tessl i tessl/pypi-fsspec

docs

caching.md

callbacks.md

compression.md

core-operations.md

filesystem-interface.md

index.md

mapping.md

registry.md

utilities.md

tile.json