Lightweight pipelining with Python functions for disk-caching, parallel computing, and fast compressed persistence
—
Core utilities including object hashing, logging with timing, backend infrastructure, and compression management for extending joblib's functionality and integrating with scientific computing workflows.
Fast hash calculation for Python objects to create unique identifiers, used internally by Memory caching and available for custom caching implementations.
def hash(obj, hash_name="md5", coerce_mmap=False):
"""
Quick calculation of hash to identify Python objects uniquely.
Parameters:
- obj: any Python object to hash
- hash_name: str, hashing algorithm ("md5" or "sha1")
- coerce_mmap: bool, treat memory-mapped arrays as regular arrays
Returns:
str: hexadecimal hash string
Raises:
ValueError: if hash_name is not supported
"""Usage Examples:
from joblib import hash
import numpy as np
# Hash simple objects
hash_int = hash(42)
hash_str = hash("hello world")
hash_list = hash([1, 2, 3, 4])
print(f"Integer hash: {hash_int}")
print(f"String hash: {hash_str}")
# Hash NumPy arrays
array = np.random.random(1000)
array_hash = hash(array)
print(f"Array hash: {array_hash}")
# Different arrays with same content have same hash
array2 = array.copy()
assert hash(array) == hash(array2)
# Different hash algorithms
md5_hash = hash(array, hash_name="md5") # Default
sha1_hash = hash(array, hash_name="sha1") # More secure
# Memory-mapped arrays
mmap_array = np.memmap('temp.dat', dtype='float32', mode='w+', shape=(1000,))
mmap_array[:] = array[:]
# Hash memory-mapped array as regular array
regular_hash = hash(mmap_array, coerce_mmap=True)
mmap_hash = hash(mmap_array, coerce_mmap=False)
# Complex objects
complex_obj = {
'data': np.random.random((100, 50)),
'params': {'learning_rate': 0.01, 'epochs': 100},
'metadata': ['training', 'validation']
}
complex_hash = hash(complex_obj)Utilities for handling non-serializable objects in parallel processing contexts.
def wrap_non_picklable_objects(obj, keep_wrapper=True):
"""
Wrap non-picklable objects to enable parallel processing.
Parameters:
- obj: object to wrap (may contain non-picklable elements)
- keep_wrapper: bool, whether to keep wrapper for round-trip compatibility
Returns:
Wrapped object that can be pickled and sent to parallel workers
"""Usage Examples:
from joblib import wrap_non_picklable_objects, Parallel, delayed
import sqlite3
# Example with non-picklable database connection
def create_db_connection():
return sqlite3.connect(':memory:')
def process_with_connection(data, connection):
# Use database connection in processing
cursor = connection.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS temp (value INTEGER)")
cursor.execute("INSERT INTO temp VALUES (?)", (data,))
return cursor.fetchall()
# Wrap non-picklable connection
connection = create_db_connection()
wrapped_connection = wrap_non_picklable_objects(connection)
# Use in parallel processing (connection will be recreated in each worker)
data_items = [1, 2, 3, 4, 5]
results = Parallel(n_jobs=2)(
delayed(process_with_connection)(item, wrapped_connection)
for item in data_items
)
# Custom objects with lambda functions or other non-picklable elements
class ProcessorWithLambda:
def __init__(self):
self.transform = lambda x: x ** 2 # Non-picklable lambda
processor = ProcessorWithLambda()
wrapped_processor = wrap_non_picklable_objects(processor)Logging utilities with built-in timing capabilities for monitoring performance and debugging computational workflows.
class Logger:
def __init__(self, depth=3, name=None):
"""
Base logging class with formatting and timing capabilities.
Parameters:
- depth: int, call stack depth for logging context
- name: str, logger name (None for auto-generation)
"""
def warn(self, msg):
"""
Log a warning message.
Parameters:
- msg: str, warning message to log
"""
def info(self, msg):
"""
Log an informational message.
Parameters:
- msg: str, info message to log
"""
def debug(self, msg):
"""
Log a debug message.
Parameters:
- msg: str, debug message to log
"""
def format(self, obj, indent=0):
"""
Return formatted representation of object.
Parameters:
- obj: object to format
- indent: int, indentation level
Returns:
str: formatted object representation
"""
class PrintTime:
def __init__(self, logfile=None, logdir=None):
"""
Print and log messages with execution time tracking.
Parameters:
- logfile: str, path to log file (None for stdout)
- logdir: str, directory for log files (None for current dir)
"""Usage Examples:
from joblib import Logger, PrintTime
import time
import numpy as np
# Basic logging
logger = Logger(name="DataProcessor")
def process_data(data):
logger.info(f"Processing {len(data)} items")
if len(data) == 0:
logger.warn("Empty data provided")
return []
logger.debug(f"Data type: {type(data)}")
# Simulate processing
result = [x * 2 for x in data]
logger.info("Processing complete")
return result
# Process with logging
data = [1, 2, 3, 4, 5]
result = process_data(data)
# Time tracking with PrintTime
timer = PrintTime()
print("Starting computation...")
start_time = time.time()
# Simulate expensive computation
large_array = np.random.random((10000, 1000))
result = np.mean(large_array, axis=1)
elapsed = time.time() - start_time
print(f"Computation completed in {elapsed:.2f} seconds")
# Custom formatting
logger = Logger()
complex_data = {
'arrays': [np.random.random(100) for _ in range(3)],
'config': {'param1': 0.1, 'param2': 'test'},
'metadata': {'version': 1, 'created': time.time()}
}
formatted_output = logger.format(complex_data, indent=2)
print("Complex data structure:")
print(formatted_output)Abstract base classes for implementing custom parallel execution and storage backends.
class ParallelBackendBase:
"""
Abstract base class for parallel execution backends.
Subclass this to implement custom parallel processing backends
for specialized computing environments or frameworks.
"""
# Backend capabilities
default_n_jobs = 1
supports_inner_max_num_threads = False
supports_retrieve_callback = False
supports_return_generator = False
supports_timeout = False
def effective_n_jobs(self, n_jobs):
"""
Determine actual number of parallel jobs.
Parameters:
- n_jobs: int, requested number of jobs
Returns:
int: actual number of jobs to use
"""
def submit(self, func, callback=None):
"""
Schedule function execution.
Parameters:
- func: callable, function to execute
- callback: callable, optional callback for result handling
Returns:
Future-like object representing the computation
"""
def retrieve_result(self, futures, timeout=None):
"""
Retrieve results from submitted computations.
Parameters:
- futures: list of future objects
- timeout: float, timeout in seconds
Returns:
Generator yielding (future, result) pairs
"""
class StoreBackendBase:
"""
Abstract base class for storage backends.
Subclass this to implement custom storage solutions
for Memory caching (e.g., cloud storage, databases).
"""
def _open_item(self, f, mode):
"""
Open item in storage backend.
Parameters:
- f: file identifier
- mode: str, file opening mode
Returns:
File-like object
"""
def _item_exists(self, location):
"""
Check if item exists in storage.
Parameters:
- location: str, item location identifier
Returns:
bool: True if item exists
"""
def _move_item(self, src, dst):
"""
Move item within storage backend.
Parameters:
- src: str, source location
- dst: str, destination location
"""
def clear_item(self, call_id):
"""
Clear single cached item.
Parameters:
- call_id: str, unique identifier for cached call
"""
def clear_path(self, path):
"""
Clear all items at specified path.
Parameters:
- path: str, path to clear
"""
def clear(self):
"""Clear all items in storage backend."""Usage Examples:
from joblib import ParallelBackendBase, StoreBackendBase, register_parallel_backend, register_store_backend
# Custom parallel backend example
class GPUBackend(ParallelBackendBase):
"""Example GPU computing backend."""
supports_timeout = True
default_n_jobs = 4 # Number of GPU streams
def __init__(self, device_id=0):
self.device_id = device_id
def effective_n_jobs(self, n_jobs):
# Limit to available GPU streams
return min(n_jobs, 8)
def submit(self, func, callback=None):
# Submit computation to GPU
# Return GPU future object
pass
def retrieve_result(self, futures, timeout=None):
# Retrieve results from GPU
for future in futures:
yield future, future.result(timeout=timeout)
# Register custom backend
register_parallel_backend('gpu', GPUBackend)
# Custom storage backend example
class RedisStoreBackend(StoreBackendBase):
"""Example Redis storage backend for caching."""
def __init__(self, host='localhost', port=6379, db=0):
import redis
self.redis_client = redis.Redis(host=host, port=port, db=db)
def _item_exists(self, location):
return self.redis_client.exists(location)
def _open_item(self, f, mode):
# Implement Redis-based file-like object
pass
def clear_item(self, call_id):
self.redis_client.delete(call_id)
def clear(self):
self.redis_client.flushdb()
# Register custom storage backend
register_store_backend('redis', RedisStoreBackend)
# Use custom backends
from joblib import Memory, Parallel, delayed, parallel_backend
# Use custom storage
mem = Memory(backend='redis', backend_options={'host': 'cache-server'})
# Use custom parallel backend
with parallel_backend('gpu', device_id=1):
results = Parallel(n_jobs=4)(delayed(gpu_function)(i) for i in range(100))Registration and management of compression algorithms for persistence operations.
def register_compressor(compressor_name, compressor, force=False):
"""
Register a new compressor for use with dump/load operations.
Parameters:
- compressor_name: str, name to identify the compressor
- compressor: compressor object implementing required interface
- force: bool, whether to overwrite existing compressor with same name
Raises:
ValueError: if compressor_name already exists and force=False
"""Usage Examples:
from joblib import register_compressor, dump, load
# Example custom compressor (simplified)
class CustomCompressor:
"""Example custom compression algorithm."""
def compress(self, data):
# Implement compression logic
return compressed_data
def decompress(self, compressed_data):
# Implement decompression logic
return original_data
# Register custom compressor
custom_comp = CustomCompressor()
register_compressor('custom', custom_comp)
# Use custom compressor
data = {'large_array': np.random.random(100000)}
dump(data, 'data_custom.pkl', compress='custom')
loaded_data = load('data_custom.pkl')
# Register external compressor library
try:
import snappy
class SnappyCompressor:
def compress(self, data):
return snappy.compress(data)
def decompress(self, compressed_data):
return snappy.decompress(compressed_data)
register_compressor('snappy', SnappyCompressor())
# Use snappy compression for fast compression/decompression
dump(data, 'data_snappy.pkl', compress='snappy')
except ImportError:
print("Snappy not available")from joblib import Memory, hash
from joblib._store_backends import StoreBackendBase
import time
class TimeBasedCacheBackend(StoreBackendBase):
"""Cache backend with automatic expiration."""
def __init__(self, base_backend, ttl_seconds=3600):
self.base_backend = base_backend
self.ttl_seconds = ttl_seconds
self.timestamps = {}
def _item_exists(self, location):
if not self.base_backend._item_exists(location):
return False
# Check if item has expired
timestamp = self.timestamps.get(location, 0)
if time.time() - timestamp > self.ttl_seconds:
self.clear_item(location)
return False
return True
def _open_item(self, f, mode):
if 'w' in mode:
# Record timestamp when writing
self.timestamps[f] = time.time()
return self.base_backend._open_item(f, mode)
# Use time-based caching
register_store_backend('ttl', TimeBasedCacheBackend)
mem = Memory('./cache', backend='ttl', backend_options={'ttl_seconds': 1800})from joblib import Logger, Parallel, delayed
import time
import psutil
class PerformanceLogger(Logger):
"""Logger with system performance monitoring."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.start_time = None
self.start_memory = None
def start_monitoring(self):
self.start_time = time.time()
self.start_memory = psutil.virtual_memory().used
self.info("Performance monitoring started")
def log_performance(self, operation_name):
if self.start_time:
elapsed = time.time() - self.start_time
current_memory = psutil.virtual_memory().used
memory_delta = current_memory - self.start_memory
self.info(f"{operation_name} completed:")
self.info(f" Time: {elapsed:.2f} seconds")
self.info(f" Memory change: {memory_delta / 1024**2:.1f} MB")
self.info(f" CPU usage: {psutil.cpu_percent()}%")
# Use performance monitoring
perf_logger = PerformanceLogger(name="Computation")
perf_logger.start_monitoring()
# Perform computation
results = Parallel(n_jobs=4)(delayed(expensive_function)(i) for i in range(100))
perf_logger.log_performance("Parallel computation")Install with Tessl CLI
npx tessl i tessl/pypi-joblib