CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fsspec

Unified pythonic interface for diverse file systems and storage backends

Pending
Overview
Eval results
Files

utilities.mddocs/

Utilities and Configuration

Helper functions for URL parsing, path manipulation, tokenization, and configuration management that support the core filesystem operations. These utilities provide essential infrastructure for protocol handling, caching, and system configuration.

Capabilities

URL and Path Processing

Functions for parsing URLs, extracting protocols, and manipulating filesystem paths across different storage backends.

def infer_storage_options(urlpath, inherit_storage_options=None):
    """
    Infer storage options from URL parameters.
    
    Parameters:
    - urlpath: str, URL with potential query parameters
    - inherit_storage_options: dict, existing options to inherit/override
    
    Returns:
    dict, storage options extracted from URL
    """

def get_protocol(url):
    """
    Extract protocol from URL.
    
    Parameters:
    - url: str, URL to parse
    
    Returns:
    str, protocol name (e.g., 's3', 'gcs', 'file')
    """

def stringify_path(filepath):
    """
    Convert path object to string.
    
    Parameters:
    - filepath: str or Path-like, file path
    
    Returns:
    str, string representation of path
    """

Compression Detection

Utilities for automatically detecting compression formats from filenames and extensions.

def infer_compression(filename):
    """
    Infer compression format from filename.
    
    Parameters:
    - filename: str, file name or path
    
    Returns:
    str or None, compression format name or None if uncompressed
    """

Tokenization and Hashing

Functions for generating consistent hash tokens from filesystem paths and parameters, used internally for caching and deduplication.

def tokenize(*args, **kwargs):
    """
    Generate hash token from arguments.
    
    Parameters:
    - *args: positional arguments to hash
    - **kwargs: keyword arguments to hash
    
    Returns:
    str, hash token string
    """

Block Reading Utilities

Low-level utilities for reading data blocks with delimiter support, useful for implementing custom file readers and parsers.

def read_block(file, offset, length, delimiter=None):
    """
    Read a block of data from file.
    
    Parameters:
    - file: file-like object, source file
    - offset: int, byte offset to start reading
    - length: int, maximum bytes to read
    - delimiter: bytes, delimiter to read until (optional)
    
    Returns:
    bytes, block data
    """

Filename Generation

Utilities for generating systematic filenames for batch operations and parallel processing.

def build_name_function(max_int):
    """
    Build function for generating sequential filenames.
    
    Parameters:
    - max_int: int, maximum number to generate names for
    
    Returns:
    callable, function that takes int and returns filename string
    """

Atomic File Operations

Utilities for ensuring atomic file writes and preventing data corruption during file operations.

def atomic_write(path, mode='wb'):
    """
    Context manager for atomic file writing.
    
    Parameters:
    - path: str, target file path
    - mode: str, file opening mode
    
    Returns:
    context manager, yields temporary file object
    """

Pattern Matching

Utilities for translating glob patterns to regular expressions and other pattern matching operations.

def glob_translate(pat):
    """
    Translate glob pattern to regular expression.
    
    Parameters:
    - pat: str, glob pattern
    
    Returns:
    str, regular expression pattern
    """

Configuration Management

Global configuration system for fsspec behavior and default settings.

conf: dict
    """Global configuration dictionary with fsspec settings"""

conf_dir: str
    """Configuration directory path"""

def set_conf_env(conf_dict, envdict=os.environ):
    """
    Set configuration from environment variables.
    
    Parameters:
    - conf_dict: dict, configuration dictionary to update
    - envdict: dict, environment variables dictionary
    """

def apply_config(cls, kwargs):
    """
    Apply configuration to class constructor arguments.
    
    Parameters:
    - cls: type, class to configure
    - kwargs: dict, keyword arguments to modify
    
    Returns:
    dict, modified keyword arguments with config applied
    """

Usage Patterns

URL Parameter Extraction

# Extract storage options from URL query parameters
url = 's3://bucket/path?key=ACCESS_KEY&secret=SECRET_KEY&region=us-west-2'
storage_options = fsspec.utils.infer_storage_options(url)
print(storage_options)
# {'key': 'ACCESS_KEY', 'secret': 'SECRET_KEY', 'region': 'us-west-2'}

# Use extracted options
fs = fsspec.filesystem('s3', **storage_options)

# Inherit and override options
base_options = {'key': 'BASE_KEY', 'timeout': 30}
url = 's3://bucket/path?secret=SECRET_KEY'
final_options = fsspec.utils.infer_storage_options(url, base_options)
# Result: {'key': 'BASE_KEY', 'timeout': 30, 'secret': 'SECRET_KEY'}

Protocol Detection

# Extract protocol from various URL formats
urls = [
    's3://bucket/file.txt',
    'gcs://bucket/file.txt', 
    'https://example.com/api',
    '/local/path/file.txt',
    'file:///absolute/path'
]

for url in urls:
    protocol = fsspec.utils.get_protocol(url)
    print(f"{url} -> {protocol}")

# s3://bucket/file.txt -> s3
# gcs://bucket/file.txt -> gcs  
# https://example.com/api -> https
# /local/path/file.txt -> file
# file:///absolute/path -> file

Compression Auto-Detection

# Automatically detect compression from filenames
filenames = [
    'data.csv.gz',
    'archive.tar.bz2',
    'logs.txt.xz',
    'config.json',
    'model.pkl.lz4'
]

for filename in filenames:
    compression = fsspec.utils.infer_compression(filename)
    print(f"{filename} -> {compression}")

# data.csv.gz -> gzip
# archive.tar.bz2 -> bz2
# logs.txt.xz -> lzma
# config.json -> None
# model.pkl.lz4 -> lz4

Path Standardization

import pathlib

# Convert various path types to strings
paths = [
    '/local/file.txt',
    pathlib.Path('/local/file.txt'),
    pathlib.PurePosixPath('/local/file.txt')
]

for path in paths:
    str_path = fsspec.utils.stringify_path(path)
    print(f"{type(path)} -> {str_path}")

Tokenization for Caching

# Generate consistent tokens for caching
token1 = fsspec.utils.tokenize('s3', 'bucket', 'file.txt', region='us-east-1')
token2 = fsspec.utils.tokenize('s3', 'bucket', 'file.txt', region='us-east-1')
token3 = fsspec.utils.tokenize('s3', 'bucket', 'file.txt', region='us-west-2')

print(token1 == token2)  # True - same parameters
print(token1 == token3)  # False - different region

# Use for cache keys
cache_key = fsspec.utils.tokenize(protocol, path, **storage_options)

Block Reading with Delimiters

# Read file in blocks with line boundaries
with open('large_file.txt', 'rb') as f:
    offset = 0
    block_size = 1024 * 1024  # 1MB blocks
    
    while True:
        # Read block ending at line boundary
        block = fsspec.utils.read_block(f, offset, block_size, delimiter=b'\n')
        if not block:
            break
            
        # Process complete lines
        lines = block.split(b'\n')
        for line in lines:
            if line:  # Skip empty lines
                process_line(line)
                
        offset += len(block)

Sequential Filename Generation

# Generate systematic filenames for batch output
name_func = fsspec.utils.build_name_function(1000)

filenames = [name_func(i) for i in range(5)]
print(filenames)
# ['000', '001', '002', '003', '004']

# Use with fsspec.open_files for multiple outputs
output_files = fsspec.open_files(
    'output-*.json',
    'w',
    num=10,
    name_function=name_func
)

Atomic File Writing

# Ensure atomic writes to prevent corruption
with fsspec.utils.atomic_write('/important/file.txt', 'w') as f:
    f.write('Critical data that must be written atomically\n')
    f.write('If this fails, the original file remains unchanged\n')
    # File is only moved to final location if all writes succeed

# Works with binary mode too
with fsspec.utils.atomic_write('/data/model.pkl', 'wb') as f:
    pickle.dump(model, f)

Glob Pattern Processing

# Convert glob patterns to regex for custom matching
patterns = ['*.txt', 'data_*.csv', 'logs/*/error.log']

for pattern in patterns:
    regex = fsspec.utils.glob_translate(pattern)
    print(f"{pattern} -> {regex}")

# Use compiled regex for matching
import re
regex_pattern = fsspec.utils.glob_translate('data_*.csv')
compiled = re.compile(regex_pattern)

files = ['data_1.csv', 'data_2.csv', 'config.json', 'data_old.csv']
matches = [f for f in files if compiled.match(f)]
print(matches)  # ['data_1.csv', 'data_2.csv', 'data_old.csv']

Global Configuration

# Check current configuration
print("Current fsspec config:", fsspec.config.conf)

# Set configuration options
fsspec.config.conf['default_cache_type'] = 'blockcache'
fsspec.config.conf['default_block_size'] = 1024 * 1024

# Configuration from environment variables
import os
os.environ['FSSPEC_CACHE_TYPE'] = 'readahead'
os.environ['FSSPEC_BLOCK_SIZE'] = '2097152'

fsspec.utils.set_conf_env(fsspec.config.conf)
print("Updated config:", fsspec.config.conf)

Custom Utility Functions

def get_file_info(url):
    """Get comprehensive file information from URL."""
    protocol = fsspec.utils.get_protocol(url)
    compression = fsspec.utils.infer_compression(url)
    storage_options = fsspec.utils.infer_storage_options(url)
    
    return {
        'protocol': protocol,
        'compression': compression,
        'storage_options': storage_options,
        'token': fsspec.utils.tokenize(url, **storage_options)
    }

# Use custom utility
info = get_file_info('s3://bucket/data.csv.gz?region=us-west-2')
print(info)

Error Handling with Utilities

def safe_infer_compression(filename):
    """Safely infer compression with fallback."""
    try:
        return fsspec.utils.infer_compression(filename)
    except Exception:
        # Return None if compression inference fails
        return None

def safe_get_protocol(url):
    """Safely extract protocol with fallback."""
    try:
        return fsspec.utils.get_protocol(url)
    except Exception:
        # Default to file protocol
        return 'file'

Performance Optimization with Utilities

# Cache tokenization results for repeated operations
from functools import lru_cache

@lru_cache(maxsize=1000)
def cached_tokenize(*args, **kwargs):
    """Cached version of tokenize for performance."""
    # Sort kwargs for consistent hashing
    sorted_kwargs = tuple(sorted(kwargs.items()))
    return fsspec.utils.tokenize(*args, *sorted_kwargs)

# Use cached tokenization
token = cached_tokenize('s3', 'bucket', 'file.txt', region='us-east-1')

Configuration Options

Global Settings

# Common configuration options in fsspec.config.conf
{
    'default_cache_type': 'readahead',      # Default cache strategy
    'default_block_size': 1024 * 1024,     # Default block size (1MB)  
    'connect_timeout': 10,                  # Connection timeout seconds
    'read_timeout': 30,                     # Read timeout seconds
    'max_connections': 100,                 # Max concurrent connections
    'cache_dir': '/tmp/fsspec',             # Cache directory
    'logging_level': 'INFO'                 # Logging verbosity
}

Environment Variable Mapping

# Environment variables that affect fsspec behavior
FSSPEC_CACHE_TYPE -> conf['default_cache_type']
FSSPEC_BLOCK_SIZE -> conf['default_block_size']  
FSSPEC_TIMEOUT -> conf['connect_timeout']
FSSPEC_CACHE_DIR -> conf['cache_dir']

Per-Filesystem Configuration

# Apply configuration to specific filesystem instances
config_overrides = {
    's3': {'default_cache_type': 'mmap'},
    'gcs': {'default_block_size': 2*1024*1024},
    'http': {'connect_timeout': 5}
}

# Configuration is applied when creating filesystem instances
for protocol, overrides in config_overrides.items():
    fsspec.utils.apply_config(fsspec.get_filesystem_class(protocol), overrides)

Install with Tessl CLI

npx tessl i tessl/pypi-fsspec

docs

caching.md

callbacks.md

compression.md

core-operations.md

filesystem-interface.md

index.md

mapping.md

registry.md

utilities.md

tile.json