Lightweight pipelining with Python functions for disk-caching, parallel computing, and fast compressed persistence
—
Transparent disk-caching of function results using the memoize pattern. Provides automatic cache invalidation, configurable storage backends, and memory-mapped array support for handling large datasets efficiently in scientific computing and machine learning workflows.
from typing import OptionalCreates a caching context for functions with configurable storage location, compression, and backend options.
class Memory(Logger):
def __init__(self, location=None, backend="local", mmap_mode=None, compress=False, verbose=1, backend_options=None):
"""
Create a Memory context for caching function results.
Parameters:
- location: str or pathlib.Path, cache directory (None for no caching)
- backend: str, storage backend ("local" by default)
- mmap_mode: {None, 'r+', 'r', 'w+', 'c'}, memory mapping mode for arrays
- compress: bool or int, compression level (False, True, or 0-9)
- verbose: int, verbosity level (0=silent, 1=normal, 2=verbose)
- backend_options: dict, additional backend-specific options
"""
def cache(self, func=None, ignore=None, verbose=None, mmap_mode=False, cache_validation_callback=None):
"""
Decorator to cache function results to disk.
Parameters:
- func: callable, function to cache (or None for decorator usage)
- ignore: list of str, parameter names to ignore in cache key
- verbose: int, override Memory verbose level
- mmap_mode: bool or str, memory mapping mode for this function
- cache_validation_callback: callable, custom cache validation logic
Returns:
Decorated function or MemorizedFunc instance
"""
def clear(self, warn=True):
"""
Erase complete cache directory.
Parameters:
- warn: bool, warn before clearing cache
"""
def reduce_size(self, bytes_limit=None, items_limit=None, age_limit=None):
"""
Remove cache elements to fit within specified limits.
Parameters:
- bytes_limit: int, maximum cache size in bytes
- items_limit: int, maximum number of cached items
- age_limit: datetime.timedelta, maximum age of cached items
"""
def eval(self, func, *args, **kwargs):
"""
Evaluate function within Memory context.
Parameters:
- func: callable, function to evaluate
- *args, **kwargs: arguments to pass to function
Returns:
Function result (cached if applicable)
"""
# Properties
location: Optional[str] # Cache directory location
backend: str # Storage backend type
verbose: int # Verbosity levelUsage Example:
from joblib import Memory
import numpy as np
# Create memory context
mem = Memory(location='./cache', verbose=1)
# Cache expensive computation
@mem.cache
def compute_features(data, n_components=10):
"""Expensive feature computation."""
# Simulate expensive computation
result = np.random.random((len(data), n_components))
return result
# First call computes and caches
data = np.random.random(1000)
features = compute_features(data, n_components=20)
# Second call loads from cache
features = compute_features(data, n_components=20) # Fast!
# Clear specific function cache
compute_features.clear()
# Clear entire cache
mem.clear()Manages individual cached computation results with access and cleanup capabilities.
class MemorizedResult:
def __init__(self, location, call_id, backend="local", mmap_mode=None, verbose=0, timestamp=None, metadata=None):
"""
Represent a cached computation result.
Parameters:
- location: str, cache location path
- call_id: str, unique identifier for the cached call
- backend: str, storage backend type
- mmap_mode: str, memory mapping mode
- verbose: int, verbosity level
- timestamp: float, cache creation timestamp
- metadata: dict, additional cache metadata
"""
def get(self):
"""
Read cached value and return it.
Returns:
Cached result object
"""
def clear(self):
"""
Clear this cached value from storage.
"""
# Properties
location: str # Cache location
func: str # Function name
args_id: str # Arguments identifierProvides cache validation callbacks for time-based and custom invalidation logic.
def expires_after(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0):
"""
Cache validation callback to force recomputation after duration.
Parameters:
- days, seconds, microseconds, milliseconds, minutes, hours, weeks: int,
time duration components
Returns:
Validation callback function for use with Memory.cache()
"""Usage Example:
from joblib import Memory, expires_after
from datetime import timedelta
mem = Memory('./cache')
# Cache expires after 1 hour
@mem.cache(cache_validation_callback=expires_after(hours=1))
def fetch_data():
# This will be recomputed after 1 hour
return expensive_api_call()
# Custom validation callback
def custom_validator(metadata):
"""Custom cache validation logic."""
return metadata.get('version') == get_current_version()
@mem.cache(cache_validation_callback=custom_validator)
def process_with_version():
return process_data()Extends Memory with custom storage backends for cloud storage, databases, or other persistence layers.
def register_store_backend(backend_name, backend):
"""
Register a new storage backend for Memory objects.
Parameters:
- backend_name: str, name identifying the backend
- backend: class, StoreBackendBase subclass implementation
Raises:
ValueError: If backend_name is not string or backend doesn't inherit StoreBackendBase
"""Usage Example:
from joblib import Memory, register_store_backend
from joblib._store_backends import StoreBackendBase
class S3StoreBackend(StoreBackendBase):
"""Example S3 storage backend."""
def __init__(self, bucket_name, **kwargs):
self.bucket_name = bucket_name
# S3 client initialization
def _open_item(self, f, mode):
# S3-specific file opening logic
pass
def _item_exists(self, location):
# S3-specific existence check
pass
# ... implement other required methods
# Register custom backend
register_store_backend('s3', S3StoreBackend)
# Use with Memory
mem = Memory(backend='s3', backend_options={'bucket_name': 'my-cache-bucket'})from joblib import Memory
import numpy as np
mem = Memory('./cache', mmap_mode='r')
@mem.cache
def create_large_array(size):
return np.random.random(size)
# Array is memory-mapped when loaded from cache
large_array = create_large_array((10000, 10000))@mem.cache(ignore=['verbose', 'debug'])
def process_data(data, model_params, verbose=False, debug=False):
# 'verbose' and 'debug' don't affect cache key
return model.fit(data, **model_params)# Limit cache to 1GB and 100 items
mem.reduce_size(bytes_limit=1024**3, items_limit=100)
# Remove items older than 7 days
from datetime import timedelta
mem.reduce_size(age_limit=timedelta(days=7))Install with Tessl CLI
npx tessl i tessl/pypi-joblib