Unified pythonic interface for diverse file systems and storage backends
—
Helper functions for URL parsing, path manipulation, tokenization, and configuration management that support the core filesystem operations. These utilities provide essential infrastructure for protocol handling, caching, and system configuration.
Functions for parsing URLs, extracting protocols, and manipulating filesystem paths across different storage backends.
def infer_storage_options(urlpath, inherit_storage_options=None):
"""
Infer storage options from URL parameters.
Parameters:
- urlpath: str, URL with potential query parameters
- inherit_storage_options: dict, existing options to inherit/override
Returns:
dict, storage options extracted from URL
"""
def get_protocol(url):
"""
Extract protocol from URL.
Parameters:
- url: str, URL to parse
Returns:
str, protocol name (e.g., 's3', 'gcs', 'file')
"""
def stringify_path(filepath):
"""
Convert path object to string.
Parameters:
- filepath: str or Path-like, file path
Returns:
str, string representation of path
"""Utilities for automatically detecting compression formats from filenames and extensions.
def infer_compression(filename):
"""
Infer compression format from filename.
Parameters:
- filename: str, file name or path
Returns:
str or None, compression format name or None if uncompressed
"""Functions for generating consistent hash tokens from filesystem paths and parameters, used internally for caching and deduplication.
def tokenize(*args, **kwargs):
"""
Generate hash token from arguments.
Parameters:
- *args: positional arguments to hash
- **kwargs: keyword arguments to hash
Returns:
str, hash token string
"""Low-level utilities for reading data blocks with delimiter support, useful for implementing custom file readers and parsers.
def read_block(file, offset, length, delimiter=None):
"""
Read a block of data from file.
Parameters:
- file: file-like object, source file
- offset: int, byte offset to start reading
- length: int, maximum bytes to read
- delimiter: bytes, delimiter to read until (optional)
Returns:
bytes, block data
"""Utilities for generating systematic filenames for batch operations and parallel processing.
def build_name_function(max_int):
"""
Build function for generating sequential filenames.
Parameters:
- max_int: int, maximum number to generate names for
Returns:
callable, function that takes int and returns filename string
"""Utilities for ensuring atomic file writes and preventing data corruption during file operations.
def atomic_write(path, mode='wb'):
"""
Context manager for atomic file writing.
Parameters:
- path: str, target file path
- mode: str, file opening mode
Returns:
context manager, yields temporary file object
"""Utilities for translating glob patterns to regular expressions and other pattern matching operations.
def glob_translate(pat):
"""
Translate glob pattern to regular expression.
Parameters:
- pat: str, glob pattern
Returns:
str, regular expression pattern
"""Global configuration system for fsspec behavior and default settings.
conf: dict
"""Global configuration dictionary with fsspec settings"""
conf_dir: str
"""Configuration directory path"""
def set_conf_env(conf_dict, envdict=os.environ):
"""
Set configuration from environment variables.
Parameters:
- conf_dict: dict, configuration dictionary to update
- envdict: dict, environment variables dictionary
"""
def apply_config(cls, kwargs):
"""
Apply configuration to class constructor arguments.
Parameters:
- cls: type, class to configure
- kwargs: dict, keyword arguments to modify
Returns:
dict, modified keyword arguments with config applied
"""# Extract storage options from URL query parameters
url = 's3://bucket/path?key=ACCESS_KEY&secret=SECRET_KEY®ion=us-west-2'
storage_options = fsspec.utils.infer_storage_options(url)
print(storage_options)
# {'key': 'ACCESS_KEY', 'secret': 'SECRET_KEY', 'region': 'us-west-2'}
# Use extracted options
fs = fsspec.filesystem('s3', **storage_options)
# Inherit and override options
base_options = {'key': 'BASE_KEY', 'timeout': 30}
url = 's3://bucket/path?secret=SECRET_KEY'
final_options = fsspec.utils.infer_storage_options(url, base_options)
# Result: {'key': 'BASE_KEY', 'timeout': 30, 'secret': 'SECRET_KEY'}# Extract protocol from various URL formats
urls = [
's3://bucket/file.txt',
'gcs://bucket/file.txt',
'https://example.com/api',
'/local/path/file.txt',
'file:///absolute/path'
]
for url in urls:
protocol = fsspec.utils.get_protocol(url)
print(f"{url} -> {protocol}")
# s3://bucket/file.txt -> s3
# gcs://bucket/file.txt -> gcs
# https://example.com/api -> https
# /local/path/file.txt -> file
# file:///absolute/path -> file# Automatically detect compression from filenames
filenames = [
'data.csv.gz',
'archive.tar.bz2',
'logs.txt.xz',
'config.json',
'model.pkl.lz4'
]
for filename in filenames:
compression = fsspec.utils.infer_compression(filename)
print(f"{filename} -> {compression}")
# data.csv.gz -> gzip
# archive.tar.bz2 -> bz2
# logs.txt.xz -> lzma
# config.json -> None
# model.pkl.lz4 -> lz4import pathlib
# Convert various path types to strings
paths = [
'/local/file.txt',
pathlib.Path('/local/file.txt'),
pathlib.PurePosixPath('/local/file.txt')
]
for path in paths:
str_path = fsspec.utils.stringify_path(path)
print(f"{type(path)} -> {str_path}")# Generate consistent tokens for caching
token1 = fsspec.utils.tokenize('s3', 'bucket', 'file.txt', region='us-east-1')
token2 = fsspec.utils.tokenize('s3', 'bucket', 'file.txt', region='us-east-1')
token3 = fsspec.utils.tokenize('s3', 'bucket', 'file.txt', region='us-west-2')
print(token1 == token2) # True - same parameters
print(token1 == token3) # False - different region
# Use for cache keys
cache_key = fsspec.utils.tokenize(protocol, path, **storage_options)# Read file in blocks with line boundaries
with open('large_file.txt', 'rb') as f:
offset = 0
block_size = 1024 * 1024 # 1MB blocks
while True:
# Read block ending at line boundary
block = fsspec.utils.read_block(f, offset, block_size, delimiter=b'\n')
if not block:
break
# Process complete lines
lines = block.split(b'\n')
for line in lines:
if line: # Skip empty lines
process_line(line)
offset += len(block)# Generate systematic filenames for batch output
name_func = fsspec.utils.build_name_function(1000)
filenames = [name_func(i) for i in range(5)]
print(filenames)
# ['000', '001', '002', '003', '004']
# Use with fsspec.open_files for multiple outputs
output_files = fsspec.open_files(
'output-*.json',
'w',
num=10,
name_function=name_func
)# Ensure atomic writes to prevent corruption
with fsspec.utils.atomic_write('/important/file.txt', 'w') as f:
f.write('Critical data that must be written atomically\n')
f.write('If this fails, the original file remains unchanged\n')
# File is only moved to final location if all writes succeed
# Works with binary mode too
with fsspec.utils.atomic_write('/data/model.pkl', 'wb') as f:
pickle.dump(model, f)# Convert glob patterns to regex for custom matching
patterns = ['*.txt', 'data_*.csv', 'logs/*/error.log']
for pattern in patterns:
regex = fsspec.utils.glob_translate(pattern)
print(f"{pattern} -> {regex}")
# Use compiled regex for matching
import re
regex_pattern = fsspec.utils.glob_translate('data_*.csv')
compiled = re.compile(regex_pattern)
files = ['data_1.csv', 'data_2.csv', 'config.json', 'data_old.csv']
matches = [f for f in files if compiled.match(f)]
print(matches) # ['data_1.csv', 'data_2.csv', 'data_old.csv']# Check current configuration
print("Current fsspec config:", fsspec.config.conf)
# Set configuration options
fsspec.config.conf['default_cache_type'] = 'blockcache'
fsspec.config.conf['default_block_size'] = 1024 * 1024
# Configuration from environment variables
import os
os.environ['FSSPEC_CACHE_TYPE'] = 'readahead'
os.environ['FSSPEC_BLOCK_SIZE'] = '2097152'
fsspec.utils.set_conf_env(fsspec.config.conf)
print("Updated config:", fsspec.config.conf)def get_file_info(url):
"""Get comprehensive file information from URL."""
protocol = fsspec.utils.get_protocol(url)
compression = fsspec.utils.infer_compression(url)
storage_options = fsspec.utils.infer_storage_options(url)
return {
'protocol': protocol,
'compression': compression,
'storage_options': storage_options,
'token': fsspec.utils.tokenize(url, **storage_options)
}
# Use custom utility
info = get_file_info('s3://bucket/data.csv.gz?region=us-west-2')
print(info)def safe_infer_compression(filename):
"""Safely infer compression with fallback."""
try:
return fsspec.utils.infer_compression(filename)
except Exception:
# Return None if compression inference fails
return None
def safe_get_protocol(url):
"""Safely extract protocol with fallback."""
try:
return fsspec.utils.get_protocol(url)
except Exception:
# Default to file protocol
return 'file'# Cache tokenization results for repeated operations
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_tokenize(*args, **kwargs):
"""Cached version of tokenize for performance."""
# Sort kwargs for consistent hashing
sorted_kwargs = tuple(sorted(kwargs.items()))
return fsspec.utils.tokenize(*args, *sorted_kwargs)
# Use cached tokenization
token = cached_tokenize('s3', 'bucket', 'file.txt', region='us-east-1')# Common configuration options in fsspec.config.conf
{
'default_cache_type': 'readahead', # Default cache strategy
'default_block_size': 1024 * 1024, # Default block size (1MB)
'connect_timeout': 10, # Connection timeout seconds
'read_timeout': 30, # Read timeout seconds
'max_connections': 100, # Max concurrent connections
'cache_dir': '/tmp/fsspec', # Cache directory
'logging_level': 'INFO' # Logging verbosity
}# Environment variables that affect fsspec behavior
FSSPEC_CACHE_TYPE -> conf['default_cache_type']
FSSPEC_BLOCK_SIZE -> conf['default_block_size']
FSSPEC_TIMEOUT -> conf['connect_timeout']
FSSPEC_CACHE_DIR -> conf['cache_dir']# Apply configuration to specific filesystem instances
config_overrides = {
's3': {'default_cache_type': 'mmap'},
'gcs': {'default_block_size': 2*1024*1024},
'http': {'connect_timeout': 5}
}
# Configuration is applied when creating filesystem instances
for protocol, overrides in config_overrides.items():
fsspec.utils.apply_config(fsspec.get_filesystem_class(protocol), overrides)Install with Tessl CLI
npx tessl i tessl/pypi-fsspec