Unified pythonic interface for diverse file systems and storage backends
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Extensible callback system for monitoring file transfer progress, supporting both built-in progress indicators and custom callback implementations. Callbacks provide real-time feedback during long-running filesystem operations like file uploads, downloads, and bulk transfers.
Foundation class for all progress callback implementations with standard progress tracking interface.
class Callback:
"""Base class for progress callbacks."""
def __init__(self, size=None, value=0, hooks=None, **kwargs):
"""
Initialize callback.
Parameters:
- size: int, total size/count for progress tracking
- value: int, initial progress value
- hooks: dict, event hooks for specific progress events
- **kwargs: additional callback-specific options
"""
def __call__(self, size_or_none=None, value_or_none=None):
"""
Update progress.
Parameters:
- size_or_none: int or None, set total size if provided
- value_or_none: int or None, set current progress if provided
Returns:
bool, True to continue operation, False to abort
"""
def __enter__(self):
"""
Context manager entry.
Returns:
Callback, self
"""
def __exit__(self, *args):
"""Context manager exit."""
def set_size(self, size):
"""
Set total size for progress tracking.
Parameters:
- size: int, total size/count
"""
def absolute_update(self, value):
"""
Set absolute progress value.
Parameters:
- value: int, current progress value
"""
def relative_update(self, inc=1):
"""
Update progress incrementally.
Parameters:
- inc: int, increment amount
"""
def branched(self, path_1, path_2, **kwargs):
"""
Create branched callback for parallel operations.
Parameters:
- path_1: str, first operation path
- path_2: str, second operation path
- **kwargs: additional options
Returns:
Callback, new callback instance for branched operation
"""
def close(self):
"""Close callback and clean up resources."""Ready-to-use callback implementations for common progress display needs.
class TqdmCallback(Callback):
"""Progress bar callback using tqdm library."""
def __init__(self, tqdm_kwargs=None, **kwargs):
"""
Initialize tqdm progress bar callback.
Parameters:
- tqdm_kwargs: dict, options passed to tqdm constructor
- **kwargs: additional callback options
"""
class DotPrinterCallback(Callback):
"""Simple dot-printing progress callback."""
def __init__(self, **kwargs):
"""Initialize dot printer callback."""
class NoOpCallback(Callback):
"""No-operation callback that does nothing."""
def __init__(self, **kwargs):
"""Initialize no-op callback."""Pre-configured default callback instance for immediate use.
DEFAULT_CALLBACK: Callback
"""Default callback instance (NoOpCallback)"""# Using default tqdm progress bar
callback = fsspec.callbacks.TqdmCallback()
# Download with progress bar
fs = fsspec.filesystem('s3')
fs.get('bucket/large-file.dat', 'local-file.dat', callback=callback)
# Upload with progress bar
fs.put('large-local-file.dat', 'bucket/uploaded-file.dat', callback=callback)# Progress bar automatically closes when done
with fsspec.callbacks.TqdmCallback() as callback:
fs.get('bucket/large-file.dat', 'local-file.dat', callback=callback)
# Progress bar disappears after completionclass CustomCallback(fsspec.Callback):
def __init__(self):
super().__init__()
self.start_time = time.time()
def __call__(self, size_or_none=None, value_or_none=None):
if size_or_none is not None:
self.size = size_or_none
if value_or_none is not None:
self.value = value_or_none
if hasattr(self, 'size') and hasattr(self, 'value'):
percent = (self.value / self.size) * 100
elapsed = time.time() - self.start_time
rate = self.value / elapsed if elapsed > 0 else 0
print(f"\rProgress: {percent:.1f}% ({self.value}/{self.size}) "
f"Rate: {rate:.1f} bytes/sec", end='')
return True # Continue operation
# Use custom callback
callback = CustomCallback()
fs.get('bucket/file.dat', 'local.dat', callback=callback)import fsspec
# Callback for multiple file operations
callback = fsspec.callbacks.TqdmCallback()
files_to_download = [
('bucket/file1.dat', 'local1.dat'),
('bucket/file2.dat', 'local2.dat'),
('bucket/file3.dat', 'local3.dat'),
]
# Set total size for all files
total_size = sum(fs.size(remote) for remote, local in files_to_download)
callback.set_size(total_size)
# Download files with cumulative progress
for remote, local in files_to_download:
fs.get(remote, local, callback=callback)# Main callback for overall progress
main_callback = fsspec.callbacks.TqdmCallback()
# Create branched callbacks for parallel operations
branch1 = main_callback.branched('operation1', 'operation2')
branch2 = main_callback.branched('operation1', 'operation2')
# Use in parallel operations (conceptual - actual parallelism depends on implementation)
import threading
def download_file(remote, local, callback):
fs.get(remote, local, callback=callback)
thread1 = threading.Thread(target=download_file,
args=('bucket/file1.dat', 'local1.dat', branch1))
thread2 = threading.Thread(target=download_file,
args=('bucket/file2.dat', 'local2.dat', branch2))
thread1.start()
thread2.start()
thread1.join()
thread2.join()class CancellableCallback(fsspec.Callback):
def __init__(self):
super().__init__()
self.cancelled = False
def cancel(self):
self.cancelled = True
def __call__(self, size_or_none=None, value_or_none=None):
# Update progress
if size_or_none is not None:
self.size = size_or_none
if value_or_none is not None:
self.value = value_or_none
# Check for cancellation
if self.cancelled:
print("Operation cancelled by user")
return False # Abort operation
# Display progress
if hasattr(self, 'size') and hasattr(self, 'value'):
percent = (self.value / self.size) * 100
print(f"\rProgress: {percent:.1f}%", end='')
return True # Continue operation
# Usage with cancellation
callback = CancellableCallback()
# Start operation in background
import threading
def download_with_callback():
try:
fs.get('bucket/very-large-file.dat', 'local.dat', callback=callback)
print("\nDownload completed")
except Exception as e:
print(f"\nDownload failed: {e}")
download_thread = threading.Thread(target=download_with_callback)
download_thread.start()
# Cancel after 30 seconds
time.sleep(30)
callback.cancel()
download_thread.join()callback = fsspec.callbacks.TqdmCallback()
# File copy with progress
fs.copy('bucket/source.dat', 'bucket/backup.dat', callback=callback)
# Directory sync with progress
fs.get('bucket/data/', 'local-data/', recursive=True, callback=callback)
# Bulk upload with progress
fs.put('local-data/', 'bucket/uploaded-data/', recursive=True, callback=callback)
# Multiple file operations
files = fs.glob('bucket/data/*.csv')
callback.set_size(len(files))
for i, file_path in enumerate(files):
content = fs.cat_file(file_path)
process_data(content)
callback.absolute_update(i + 1)# Customize tqdm appearance and behavior
tqdm_options = {
'unit': 'B',
'unit_scale': True,
'unit_divisor': 1024,
'desc': 'Downloading',
'ncols': 80,
'ascii': True
}
callback = fsspec.callbacks.TqdmCallback(tqdm_kwargs=tqdm_options)
# Results in progress bar like:
# Downloading: 45%|████▌ | 450MB/1.00GB [00:30<00:25, 15.0MB/s]
fs.get('bucket/large-file.dat', 'local.dat', callback=callback)import logging
class LoggingCallback(fsspec.Callback):
def __init__(self, log_interval=1024*1024): # Log every MB
super().__init__()
self.log_interval = log_interval
self.last_logged = 0
self.logger = logging.getLogger(__name__)
def __call__(self, size_or_none=None, value_or_none=None):
if size_or_none is not None:
self.size = size_or_none
self.logger.info(f"Starting operation, total size: {self.size} bytes")
if value_or_none is not None:
self.value = value_or_none
# Log at intervals
if self.value - self.last_logged >= self.log_interval:
if hasattr(self, 'size'):
percent = (self.value / self.size) * 100
self.logger.info(f"Progress: {percent:.1f}% ({self.value}/{self.size} bytes)")
self.last_logged = self.value
return True
# Use logging callback
logging.basicConfig(level=logging.INFO)
callback = LoggingCallback()
fs.get('bucket/file.dat', 'local.dat', callback=callback)def on_start(callback):
print("Transfer started")
def on_complete(callback):
print("Transfer completed successfully")
def on_error(callback, error):
print(f"Transfer failed: {error}")
# Callbacks can use hooks for event handling
hooks = {
'start': on_start,
'complete': on_complete,
'error': on_error
}
callback = fsspec.Callback(hooks=hooks)
# Implementation would call hooks at appropriate timesclass RobustCallback(fsspec.Callback):
def __call__(self, size_or_none=None, value_or_none=None):
try:
# Update progress display
self.update_display(size_or_none, value_or_none)
return True
except Exception as e:
# Log error but don't fail the transfer
logging.error(f"Callback error: {e}")
return True # Continue operation despite callback errorInstall with Tessl CLI
npx tessl i tessl/pypi-fsspec